trx_toolkit: use RxMsg/TxMsg instead of TRX2L1/L12TRX

I intentionally do not use 'Downlink' and 'Uplink' terms in this project
because both MS and BTS transmit and receive on the opposite directions.
A burst coming from demodulator may be a Downlink or an Uplink burst
depending on the context, so we definitely need more precise terms.

Back then when I started to work on TRX toolkit, I decided to use the
'TRX2L1' and 'L12TRX' for receive and transmit directions respectively.
Now I find them hard to read, so let's replace them with 'Rx' and 'Tx'.

Change-Id: I688f24a3c09dd7e1cc00b5530ec26c8e8cfd8f7c
Related: OS#4006, SYS#4895
This commit is contained in:
Vadim Yanitskiy 2021-04-28 02:05:15 +02:00
parent b2349bc359
commit 1b7b4ec7a4
12 changed files with 117 additions and 121 deletions

View File

@ -37,8 +37,8 @@ class BurstForwarder(TRXList):
- actual RX / TX frequencies, - actual RX / TX frequencies,
- list of active timeslots. - list of active timeslots.
Each to be distributed L12TRX message is being transformed Each to be distributed 'TxMsg' message is being transformed
into a TRX2L1 message, and then forwarded to transceivers into a 'RxMsg' message, and then forwarded to transceivers
with partially initialized header. All uninitialized header with partially initialized header. All uninitialized header
fields (such as rssi and toa256) shall be set by each fields (such as rssi and toa256) shall be set by each
transceiver individually before sending towards the L1. transceiver individually before sending towards the L1.
@ -69,6 +69,6 @@ class BurstForwarder(TRXList):
if trx.get_rx_freq(rx_msg.fn) != tx_freq: if trx.get_rx_freq(rx_msg.fn) != tx_freq:
continue continue
# Transform from L12TRX to TRX2L1 and forward # Transform from TxMsg to RxMsg and forward
tx_msg = rx_msg.gen_trx2l1(ver = trx.data_if._hdr_ver) tx_msg = rx_msg.trans(ver = trx.data_if._hdr_ver)
trx.handle_data_msg(src_trx, rx_msg, tx_msg) trx.handle_data_msg(src_trx, rx_msg, tx_msg)

View File

@ -68,9 +68,9 @@ class Application(ApplicationBase):
# Init an empty DATA message # Init an empty DATA message
if self.argv.conn_mode == "TRX": if self.argv.conn_mode == "TRX":
msg = DATAMSG_L12TRX(ver = self.argv.hdr_ver) msg = TxMsg(ver = self.argv.hdr_ver)
elif self.argv.conn_mode == "L1": elif self.argv.conn_mode == "L1":
msg = DATAMSG_TRX2L1(ver = self.argv.hdr_ver) msg = RxMsg(ver = self.argv.hdr_ver)
# Generate a random frame number or use provided one # Generate a random frame number or use provided one
fn_init = msg.rand_fn() if self.argv.tdma_fn is None \ fn_init = msg.rand_fn() if self.argv.tdma_fn is None \
@ -176,7 +176,7 @@ class Application(ApplicationBase):
help = "How many bursts to send (default %(default)s)") help = "How many bursts to send (default %(default)s)")
bg_group.add_argument("-v", "--hdr-version", metavar = "VER", bg_group.add_argument("-v", "--hdr-version", metavar = "VER",
dest = "hdr_ver", type = int, dest = "hdr_ver", type = int,
default = 0, choices = DATAMSG.KNOWN_VERSIONS, default = 0, choices = Msg.KNOWN_VERSIONS,
help = "TRXD header version (default %(default)s)") help = "TRXD header version (default %(default)s)")
bg_group.add_argument("-f", "--frame-number", metavar = "FN", bg_group.add_argument("-f", "--frame-number", metavar = "FN",
dest = "tdma_fn", type = int, dest = "tdma_fn", type = int,

View File

@ -79,11 +79,10 @@ class Application(ApplicationBase):
def msg_pass_filter(self, msg): def msg_pass_filter(self, msg):
# Direction filter # Direction filter
l12trx = self.argv.conn_mode == "TRX" if isinstance(msg, RxMsg) and self.argv.conn_mode == "TRX":
if isinstance(msg, DATAMSG_L12TRX) and not l12trx: return False # cannot send RxMsg to TRX
return False if isinstance(msg, TxMsg) and self.argv.conn_mode == "L1":
elif isinstance(msg, DATAMSG_TRX2L1) and l12trx: return False # cannot send TxMsg to L1
return False
# Timeslot filter # Timeslot filter
if self.argv.pf_tn is not None: if self.argv.pf_tn is not None:

View File

@ -25,7 +25,7 @@
import logging as log import logging as log
from ctrl_if import CTRLInterface from ctrl_if import CTRLInterface
from data_msg import DATAMSG from data_msg import Msg
class CTRLInterfaceTRX(CTRLInterface): class CTRLInterfaceTRX(CTRLInterface):
""" CTRL interface handler for common transceiver management commands. """ CTRL interface handler for common transceiver management commands.
@ -222,7 +222,7 @@ class CTRLInterfaceTRX(CTRLInterface):
# ... and store current for logging # ... and store current for logging
ver_cur = self.trx.data_if._hdr_ver ver_cur = self.trx.data_if._hdr_ver
if ver_req < 0 or ver_req > DATAMSG.CHDR_VERSION_MAX: if ver_req < 0 or ver_req > Msg.CHDR_VERSION_MAX:
log.error("(%s) Incorrect TRXD header version %u" log.error("(%s) Incorrect TRXD header version %u"
% (self.trx, ver_req)) % (self.trx, ver_req))
return -1 return -1

View File

@ -28,18 +28,18 @@ from data_msg import *
class DATADump: class DATADump:
# Constants # Constants
TAG_L12TRX = b'\x01' TAG_TxMsg = b'\x01'
TAG_TRX2L1 = b'\x02' TAG_RxMsg = b'\x02'
HDR_LENGTH = 3 HDR_LENGTH = 3
# Generates raw bytes from a DATA message # Generates raw bytes from a DATA message
# Return value: raw message bytes # Return value: raw message bytes
def dump_msg(self, msg): def dump_msg(self, msg):
# Determine a message type # Determine a message type
if isinstance(msg, DATAMSG_L12TRX): if isinstance(msg, TxMsg):
tag = self.TAG_L12TRX tag = self.TAG_TxMsg
elif isinstance(msg, DATAMSG_TRX2L1): elif isinstance(msg, RxMsg):
tag = self.TAG_TRX2L1 tag = self.TAG_RxMsg
else: else:
raise ValueError("Unknown message type") raise ValueError("Unknown message type")
@ -61,12 +61,10 @@ class DATADump:
tag = hdr[:1] tag = hdr[:1]
# Check if tag is known # Check if tag is known
if tag == self.TAG_L12TRX: if tag == self.TAG_TxMsg:
# L1 -> TRX msg = TxMsg()
msg = DATAMSG_L12TRX() elif tag == self.TAG_RxMsg:
elif tag == self.TAG_TRX2L1: msg = RxMsg()
# TRX -> L1
msg = DATAMSG_TRX2L1()
else: else:
# Unknown tag # Unknown tag
return False return False

View File

@ -35,7 +35,7 @@ class DATAInterface(UDPLink):
log.debug("Init TRXD interface (%s)" % self.desc_link()) log.debug("Init TRXD interface (%s)" % self.desc_link())
def set_hdr_ver(self, ver): def set_hdr_ver(self, ver):
if not ver in DATAMSG.KNOWN_VERSIONS: if not ver in Msg.KNOWN_VERSIONS:
return False return False
self._hdr_ver = ver self._hdr_ver = ver
@ -43,7 +43,7 @@ class DATAInterface(UDPLink):
def pick_hdr_ver(self, ver_req): def pick_hdr_ver(self, ver_req):
# Pick a version that is lower or equal to ver_req # Pick a version that is lower or equal to ver_req
for ver in DATAMSG.KNOWN_VERSIONS[::-1]: for ver in Msg.KNOWN_VERSIONS[::-1]:
if ver <= ver_req: if ver <= ver_req:
return ver return ver
@ -64,16 +64,16 @@ class DATAInterface(UDPLink):
data, _ = self.sock.recvfrom(512) data, _ = self.sock.recvfrom(512)
return data return data
def recv_l12trx_msg(self): def recv_tx_msg(self):
# Read raw data from socket # Read raw data from socket
data = self.recv_raw_data() data = self.recv_raw_data()
# Attempt to parse as a L12TRX message # Attempt to parse a TRXD Tx message
try: try:
msg = DATAMSG_L12TRX() msg = TxMsg()
msg.parse_msg(bytearray(data)) msg.parse_msg(bytearray(data))
except: except:
log.error("Failed to parse a L12TRX message " log.error("Failed to parse a TRXD Tx message "
"from R:%s:%u" % (self.remote_addr, self.remote_port)) "from R:%s:%u" % (self.remote_addr, self.remote_port))
return None return None
@ -84,16 +84,16 @@ class DATAInterface(UDPLink):
return msg return msg
def recv_trx2l1_msg(self): def recv_rx_msg(self):
# Read raw data from socket # Read raw data from socket
data = self.recv_raw_data() data = self.recv_raw_data()
# Attempt to parse as a L12TRX message # Attempt to parse a TRXD Rx message
try: try:
msg = DATAMSG_TRX2L1() msg = RxMsg()
msg.parse_msg(bytearray(data)) msg.parse_msg(bytearray(data))
except: except:
log.error("Failed to parse a TRX2L1 message " log.error("Failed to parse a TRXD Rx message "
"from R:%s:%u" % (self.remote_addr, self.remote_port)) "from R:%s:%u" % (self.remote_addr, self.remote_port))
return None return None
@ -106,10 +106,10 @@ class DATAInterface(UDPLink):
def send_msg(self, msg, legacy = False): def send_msg(self, msg, legacy = False):
try: try:
# Validate and encode TRXD message # Validate and encode a TRXD message
payload = msg.gen_msg(legacy) payload = msg.gen_msg(legacy)
except ValueError as e: except ValueError as e:
log.error("Failed to encode a TRX2L1 message ('%s') " log.error("Failed to encode a TRXD message ('%s') "
"due to error: %s" % (msg.desc_hdr(), e)) "due to error: %s" % (msg.desc_hdr(), e))
# TODO: we may want to send a NOPE.ind here # TODO: we may want to send a NOPE.ind here
return return

View File

@ -59,7 +59,7 @@ class Modulation(Enum):
return mod return mod
return None return None
class DATAMSG(abc.ABC): class Msg(abc.ABC):
''' TRXD (DATA) message coding API (common part). ''' ''' TRXD (DATA) message coding API (common part). '''
# NOTE: up to 16 versions can be encoded # NOTE: up to 16 versions can be encoded
@ -228,8 +228,8 @@ class DATAMSG(abc.ABC):
else: else:
self.burst = None self.burst = None
class DATAMSG_L12TRX(DATAMSG): class TxMsg(Msg):
''' L12TRX (L1 -> TRX) message coding API. ''' ''' Tx (L1 -> TRX) message coding API. '''
# Constants # Constants
PWR_MIN = 0x00 PWR_MIN = 0x00
@ -257,7 +257,7 @@ class DATAMSG_L12TRX(DATAMSG):
''' Validate the message fields (throws ValueError). ''' ''' Validate the message fields (throws ValueError). '''
# Validate common fields # Validate common fields
DATAMSG.validate(self) Msg.validate(self)
if self.pwr is None: if self.pwr is None:
raise ValueError("Tx Attenuation level is not set") raise ValueError("Tx Attenuation level is not set")
@ -287,14 +287,14 @@ class DATAMSG_L12TRX(DATAMSG):
def rand_hdr(self): def rand_hdr(self):
''' Randomize message specific header. ''' ''' Randomize message specific header. '''
DATAMSG.rand_hdr(self) Msg.rand_hdr(self)
self.pwr = self.rand_pwr() self.pwr = self.rand_pwr()
def desc_hdr(self): def desc_hdr(self):
''' Generate human-readable header description. ''' ''' Generate human-readable header description. '''
# Describe the common part # Describe the common part
result = DATAMSG.desc_hdr(self) result = Msg.desc_hdr(self)
if self.pwr is not None: if self.pwr is not None:
result += ("pwr=%u " % self.pwr) result += ("pwr=%u " % self.pwr)
@ -340,11 +340,11 @@ class DATAMSG_L12TRX(DATAMSG):
''' Generate a random message specific burst. ''' ''' Generate a random message specific burst. '''
self.burst = [random.randint(0, 1) for _ in range(length)] self.burst = [random.randint(0, 1) for _ in range(length)]
def gen_trx2l1(self, ver = None): def trans(self, ver = None):
''' Transform this message to TRX2L1 message. ''' ''' Transform this message into RxMsg. '''
# Allocate a new message # Allocate a new message
msg = DATAMSG_TRX2L1(fn = self.fn, tn = self.tn, msg = RxMsg(fn = self.fn, tn = self.tn,
ver = self.ver if ver is None else ver) ver = self.ver if ver is None else ver)
# Convert burst bits # Convert burst bits
@ -355,8 +355,8 @@ class DATAMSG_L12TRX(DATAMSG):
return msg return msg
class DATAMSG_TRX2L1(DATAMSG): class RxMsg(Msg):
''' TRX2L1 (TRX -> L1) message coding API. ''' ''' Rx (TRX -> L1) message coding API. '''
# rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise # rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise
RSSI_MIN = -120 RSSI_MIN = -120
@ -442,7 +442,7 @@ class DATAMSG_TRX2L1(DATAMSG):
''' Validate the message header fields (throws ValueError). ''' ''' Validate the message header fields (throws ValueError). '''
# Validate common fields # Validate common fields
DATAMSG.validate(self) Msg.validate(self)
if self.rssi is None: if self.rssi is None:
raise ValueError("RSSI is not set") raise ValueError("RSSI is not set")
@ -512,7 +512,7 @@ class DATAMSG_TRX2L1(DATAMSG):
def rand_hdr(self): def rand_hdr(self):
''' Randomize message specific header. ''' ''' Randomize message specific header. '''
DATAMSG.rand_hdr(self) Msg.rand_hdr(self)
self.rssi = self.rand_rssi() self.rssi = self.rand_rssi()
self.toa256 = self.rand_toa256() self.toa256 = self.rand_toa256()
@ -531,7 +531,7 @@ class DATAMSG_TRX2L1(DATAMSG):
''' Generate human-readable header description. ''' ''' Generate human-readable header description. '''
# Describe the common part # Describe the common part
result = DATAMSG.desc_hdr(self) result = Msg.desc_hdr(self)
if self.rssi is not None: if self.rssi is not None:
result += ("rssi=%d " % self.rssi) result += ("rssi=%d " % self.rssi)
@ -679,11 +679,11 @@ class DATAMSG_TRX2L1(DATAMSG):
self.burst = [random.randint(-127, 127) for _ in range(length)] self.burst = [random.randint(-127, 127) for _ in range(length)]
def gen_l12trx(self, ver = None): def trans(self, ver = None):
''' Transform this message to L12TRX message. ''' ''' Transform this message to TxMsg. '''
# Allocate a new message # Allocate a new message
msg = DATAMSG_L12TRX(fn = self.fn, tn = self.tn, msg = TxMsg(fn = self.fn, tn = self.tn,
ver = self.ver if ver is None else ver) ver = self.ver if ver is None else ver)
# Convert burst bits # Convert burst bits

View File

@ -213,7 +213,7 @@ class FakeTRX(Transceiver):
msg.tsc_set = 0 msg.tsc_set = 0
msg.tsc = 0 msg.tsc = 0
# Takes (partially initialized) TRX2L1 message, # Takes (partially initialized) TRXD Rx message,
# simulates RF path parameters (such as RSSI), # simulates RF path parameters (such as RSSI),
# and sends towards the L1 # and sends towards the L1
def handle_data_msg(self, src_trx, src_msg, msg): def handle_data_msg(self, src_trx, src_msg, msg):

View File

@ -50,12 +50,12 @@ class DATADump_Test(unittest.TestCase):
# Burst bits (if present) # Burst bits (if present)
self.assertEqual(a.burst, b.burst) self.assertEqual(a.burst, b.burst)
# TRX2L1 specific fields # TxMsg specific fields
if isinstance(a, DATAMSG_L12TRX): if isinstance(a, TxMsg):
self.assertEqual(a.pwr, b.pwr) self.assertEqual(a.pwr, b.pwr)
# L12TRX specific fields # RxMsg specific fields
if isinstance(a, DATAMSG_TRX2L1): if isinstance(a, RxMsg):
# Version independent fields # Version independent fields
self.assertEqual(a.toa256, b.toa256) self.assertEqual(a.toa256, b.toa256)
self.assertEqual(a.rssi, b.rssi) self.assertEqual(a.rssi, b.rssi)
@ -88,8 +88,8 @@ class DATADump_Test(unittest.TestCase):
# Generate a mixed list of random messages # Generate a mixed list of random messages
def _gen_rand_message_mix(self, count, ver = 1): def _gen_rand_message_mix(self, count, ver = 1):
msg_list = [] msg_list = []
msg_list += self._gen_rand_messages(DATAMSG_TRX2L1, count) msg_list += self._gen_rand_messages(RxMsg, count)
msg_list += self._gen_rand_messages(DATAMSG_L12TRX, count) msg_list += self._gen_rand_messages(TxMsg, count)
random.shuffle(msg_list) random.shuffle(msg_list)
return msg_list return msg_list
@ -100,15 +100,15 @@ class DATADump_Test(unittest.TestCase):
msg = self._ddf.parse_msg(0) msg = self._ddf.parse_msg(0)
self._compare_msg(msg, msg_ref) self._compare_msg(msg, msg_ref)
# Store one TRX2L1 message in a file, read it back and compare # Store one Rx message in a file, read it back and compare
def test_store_and_parse_trx2l1(self): def test_store_and_parse_rx_msg(self):
self._test_store_and_parse(DATAMSG_TRX2L1) self._test_store_and_parse(RxMsg)
# Store one L12TRX message in a file, read it back and compare # Store one Tx message in a file, read it back and compare
def test_store_and_parse_l12trx(self): def test_store_and_parse_tx_msg(self):
self._test_store_and_parse(DATAMSG_L12TRX) self._test_store_and_parse(TxMsg)
# Store multiple TRX2L1/L12TRX messages in a file, read them back and compare # Store multiple Rx/Tx messages in a file, read them back and compare
def test_store_and_parse_all(self): def test_store_and_parse_all(self):
# Store a mixed list of random messages (19 + 19) # Store a mixed list of random messages (19 + 19)
msg_list_ref = self._gen_rand_message_mix(19) msg_list_ref = self._gen_rand_message_mix(19)
@ -143,7 +143,7 @@ class DATADump_Test(unittest.TestCase):
def test_parse_len_overflow(self): def test_parse_len_overflow(self):
# Write a malformed message directly # Write a malformed message directly
self._tf.write(DATADump.TAG_L12TRX) self._tf.write(DATADump.TAG_TxMsg)
self._tf.write(b'\x00\x63') # 99 self._tf.write(b'\x00\x63') # 99
self._tf.write(b'\xff' * 90) self._tf.write(b'\xff' * 90)

View File

@ -24,9 +24,9 @@
import unittest import unittest
from data_msg import DATAMSG, DATAMSG_L12TRX, DATAMSG_TRX2L1 from data_msg import Msg, TxMsg, RxMsg
class DATAMSG_Test(unittest.TestCase): class Msg_Test(unittest.TestCase):
# Compare message a with message b # Compare message a with message b
def _compare_msg(self, a, b): def _compare_msg(self, a, b):
# Make sure we're comparing messages of the same type # Make sure we're comparing messages of the same type
@ -40,12 +40,12 @@ class DATAMSG_Test(unittest.TestCase):
# Burst bits (if present) # Burst bits (if present)
self.assertEqual(a.burst, b.burst) self.assertEqual(a.burst, b.burst)
# TRX2L1 specific fields # TxMsg specific fields
if isinstance(a, DATAMSG_L12TRX): if isinstance(a, TxMsg):
self.assertEqual(a.pwr, b.pwr) self.assertEqual(a.pwr, b.pwr)
# L12TRX specific fields # RxMsg specific fields
if isinstance(a, DATAMSG_TRX2L1): if isinstance(a, RxMsg):
# Version independent fields # Version independent fields
self.assertEqual(a.toa256, b.toa256) self.assertEqual(a.toa256, b.toa256)
self.assertEqual(a.rssi, b.rssi) self.assertEqual(a.rssi, b.rssi)
@ -62,38 +62,38 @@ class DATAMSG_Test(unittest.TestCase):
def test_validate(self): def test_validate(self):
# Unknown version # Unknown version
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
msg = DATAMSG_TRX2L1(fn = 0, tn = 0, ver = 100) msg = RxMsg(fn = 0, tn = 0, ver = 100)
msg.validate() msg.validate()
# Uninitialized field # Uninitialized field
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
msg = DATAMSG_TRX2L1() msg = RxMsg()
msg.validate() msg.validate()
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
msg = DATAMSG_TRX2L1(fn = None, tn = 0) msg = RxMsg(fn = None, tn = 0)
msg.validate() msg.validate()
# Out-of-range value(s) # Out-of-range value(s)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
msg = DATAMSG_TRX2L1(fn = -1, tn = 0) msg = RxMsg(fn = -1, tn = 0)
msg.validate() msg.validate()
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
msg = DATAMSG_TRX2L1(fn = 0, tn = 10) msg = RxMsg(fn = 0, tn = 10)
msg.validate() msg.validate()
# Validate header and burst randomization # Validate header and burst randomization
def test_rand_hdr_burst(self): def test_rand_hdr_burst(self):
msg_l12trx = DATAMSG_L12TRX() tx_msg = TxMsg()
msg_trx2l1 = DATAMSG_TRX2L1() rx_msg = RxMsg()
for i in range(100): for i in range(100):
msg_l12trx.rand_burst() tx_msg.rand_burst()
msg_trx2l1.rand_burst() rx_msg.rand_burst()
msg_l12trx.rand_hdr() tx_msg.rand_hdr()
msg_trx2l1.rand_hdr() rx_msg.rand_hdr()
msg_l12trx.validate() tx_msg.validate()
msg_trx2l1.validate() rx_msg.validate()
def _test_enc_dec(self, msg, legacy = False, nope_ind = False): def _test_enc_dec(self, msg, legacy = False, nope_ind = False):
# Prepare a given message (randomize) # Prepare a given message (randomize)
@ -120,22 +120,22 @@ class DATAMSG_Test(unittest.TestCase):
# Validate encoding and decoding # Validate encoding and decoding
def test_enc_dec(self): def test_enc_dec(self):
for ver in DATAMSG.KNOWN_VERSIONS: for ver in Msg.KNOWN_VERSIONS:
with self.subTest("L1 -> TRX message", ver = ver): with self.subTest("TxMsg", ver = ver):
msg = DATAMSG_L12TRX(ver = ver) msg = TxMsg(ver = ver)
self._test_enc_dec(msg) self._test_enc_dec(msg)
with self.subTest("TRX -> L1 message", ver = ver): with self.subTest("RxMsg", ver = ver):
msg = DATAMSG_TRX2L1(ver = ver) msg = RxMsg(ver = ver)
self._test_enc_dec(msg) self._test_enc_dec(msg)
if ver >= 1: if ver >= 1:
with self.subTest("TRX -> L1 NOPE.ind", ver = ver): with self.subTest("RxMsg NOPE.ind", ver = ver):
msg = DATAMSG_TRX2L1(ver = ver) msg = RxMsg(ver = ver)
self._test_enc_dec(msg, nope_ind = True) self._test_enc_dec(msg, nope_ind = True)
with self.subTest("TRX -> L1 message (legacy)"): with self.subTest("RxMsg (legacy transceiver)"):
msg = DATAMSG_TRX2L1(ver = 0) msg = RxMsg(ver = 0)
self._test_enc_dec(msg, legacy = True) self._test_enc_dec(msg, legacy = True)
# Validate bit conversations # Validate bit conversations
@ -144,16 +144,16 @@ class DATAMSG_Test(unittest.TestCase):
sbits_ref = list(range(-127, 128)) sbits_ref = list(range(-127, 128))
# Test both usbit2sbit() and sbit2usbit() # Test both usbit2sbit() and sbit2usbit()
sbits = DATAMSG.usbit2sbit(usbits_ref) sbits = Msg.usbit2sbit(usbits_ref)
usbits = DATAMSG.sbit2usbit(sbits) usbits = Msg.sbit2usbit(sbits)
self.assertEqual(usbits[:255], usbits_ref[:255]) self.assertEqual(usbits[:255], usbits_ref[:255])
self.assertEqual(usbits[255], 254) self.assertEqual(usbits[255], 254)
# Test both sbit2ubit() and ubit2sbit() # Test both sbit2ubit() and ubit2sbit()
ubits = DATAMSG.sbit2ubit(sbits_ref) ubits = Msg.sbit2ubit(sbits_ref)
self.assertEqual(ubits, ([1] * 127 + [0] * 128)) self.assertEqual(ubits, ([1] * 127 + [0] * 128))
sbits = DATAMSG.ubit2sbit(ubits) sbits = Msg.ubit2sbit(ubits)
self.assertEqual(sbits, ([-127] * 127 + [127] * 128)) self.assertEqual(sbits, ([-127] * 127 + [127] * 128))
def _test_transform(self, msg): def _test_transform(self, msg):
@ -162,31 +162,31 @@ class DATAMSG_Test(unittest.TestCase):
msg.rand_burst() msg.rand_burst()
# Perform message transformation # Perform message transformation
if isinstance(msg, DATAMSG_L12TRX): if isinstance(msg, TxMsg):
msg_trans = msg.gen_trx2l1() msg_trans = msg.trans()
else: else:
msg_trans = msg.gen_l12trx() msg_trans = msg.trans()
self.assertEqual(msg_trans.ver, msg.ver) self.assertEqual(msg_trans.ver, msg.ver)
self.assertEqual(msg_trans.fn, msg.fn) self.assertEqual(msg_trans.fn, msg.fn)
self.assertEqual(msg_trans.tn, msg.tn) self.assertEqual(msg_trans.tn, msg.tn)
if isinstance(msg, DATAMSG_TRX2L1): if isinstance(msg, RxMsg):
burst = DATAMSG.sbit2ubit(msg.burst) burst = Msg.sbit2ubit(msg.burst)
self.assertEqual(msg_trans.burst, burst) self.assertEqual(msg_trans.burst, burst)
else: else:
burst = DATAMSG.ubit2sbit(msg.burst) burst = Msg.ubit2sbit(msg.burst)
self.assertEqual(msg_trans.burst, burst) self.assertEqual(msg_trans.burst, burst)
# Validate message transformation # Validate message transformation
def test_transform(self): def test_transform(self):
for ver in DATAMSG.KNOWN_VERSIONS: for ver in Msg.KNOWN_VERSIONS:
with self.subTest("L1 -> TRX message", ver = ver): with self.subTest("TxMsg", ver = ver):
msg = DATAMSG_L12TRX(ver = ver) msg = TxMsg(ver = ver)
self._test_transform(msg) self._test_transform(msg)
with self.subTest("TRX -> L1 message", ver = ver): with self.subTest("RxMsg", ver = ver):
msg = DATAMSG_TRX2L1(ver = ver) msg = RxMsg(ver = ver)
self._test_transform(msg) self._test_transform(msg)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -258,7 +258,7 @@ class Transceiver:
def recv_data_msg(self): def recv_data_msg(self):
# Read and parse data from socket # Read and parse data from socket
msg = self.data_if.recv_l12trx_msg() msg = self.data_if.recv_tx_msg()
if not msg: if not msg:
return None return None

View File

@ -101,10 +101,10 @@ class Application(ApplicationBase):
msg_raw = bytearray(trx.load) msg_raw = bytearray(trx.load)
# Determine a burst direction (L1 <-> TRX) # Determine a burst direction (L1 <-> TRX)
l12trx = udp.sport > udp.dport tx_dir = udp.sport > udp.dport
# Create an empty DATA message # Create an empty DATA message
msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1() msg = TxMsg() if tx_dir else RxMsg()
# Attempt to parse the payload as a DATA message # Attempt to parse the payload as a DATA message
try: try:
@ -124,8 +124,7 @@ class Application(ApplicationBase):
return return
# Debug print # Debug print
log.debug("%s burst: %s" \ log.debug("%s burst: %s", "L1 -> TRX" if tx_dir else "TRX -> L1", msg.desc_hdr())
% ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr()))
# Poke message handler # Poke message handler
self.msg_handle(msg) self.msg_handle(msg)
@ -139,10 +138,10 @@ class Application(ApplicationBase):
# Direction filter # Direction filter
if self.argv.direction is not None: if self.argv.direction is not None:
if self.argv.direction == "TRX": # L1 -> TRX if self.argv.direction == "TRX": # L1 -> TRX
if not isinstance(msg, DATAMSG_L12TRX): if not isinstance(msg, TxMsg):
return False return False
elif self.argv.direction == "L1": # TRX -> L1 elif self.argv.direction == "L1": # TRX -> L1
if not isinstance(msg, DATAMSG_TRX2L1): if not isinstance(msg, RxMsg):
return False return False
# Timeslot filter # Timeslot filter
@ -159,7 +158,7 @@ class Application(ApplicationBase):
return False return False
# Message type specific filtering # Message type specific filtering
if isinstance(msg, DATAMSG_TRX2L1): if isinstance(msg, RxMsg):
# NOPE.ind filter # NOPE.ind filter
if not self.argv.pf_nope_ind and msg.nope_ind: if not self.argv.pf_nope_ind and msg.nope_ind:
return False return False