trx_toolkit/data_msg.py: rewrite unit tests to use unittest framework
Change-Id: Ia0cc7447b193a705e994078d16f3902339219916
This commit is contained in:
parent
f445db03c7
commit
5cee398277
|
@ -856,215 +856,3 @@ class DATAMSG_TRX2L1(DATAMSG):
|
|||
msg.burst = self.sbit2ubit(self.burst)
|
||||
|
||||
return msg
|
||||
|
||||
# Regression test
|
||||
if __name__ == '__main__':
|
||||
import logging as log
|
||||
|
||||
# Configure logging
|
||||
log.basicConfig(level = log.DEBUG,
|
||||
format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
|
||||
|
||||
log.info("Generating the reference messages")
|
||||
|
||||
# Create messages of both types
|
||||
msg_l12trx_ref = DATAMSG_L12TRX()
|
||||
msg_trx2l1_ref = DATAMSG_TRX2L1()
|
||||
|
||||
# Validate header randomization
|
||||
for i in range(0, 100):
|
||||
msg_l12trx_ref.rand_hdr()
|
||||
msg_trx2l1_ref.rand_hdr()
|
||||
|
||||
msg_l12trx_ref.rand_burst()
|
||||
msg_trx2l1_ref.rand_burst()
|
||||
|
||||
msg_l12trx_ref.validate()
|
||||
msg_trx2l1_ref.validate()
|
||||
|
||||
log.info("Validate header randomization: OK")
|
||||
|
||||
# Test error handling for common fields
|
||||
msg = DATAMSG()
|
||||
|
||||
# Make sure that message validation throws a ValueError
|
||||
def validate_throw(msg):
|
||||
try:
|
||||
msg.validate()
|
||||
return False
|
||||
except ValueError:
|
||||
return True
|
||||
|
||||
# Unknown version
|
||||
msg.rand_hdr()
|
||||
msg.ver = 100
|
||||
assert(validate_throw(msg))
|
||||
|
||||
# Uninitialized field
|
||||
msg.rand_hdr()
|
||||
msg.fn = None
|
||||
assert(validate_throw(msg))
|
||||
|
||||
# Out-of-range value
|
||||
msg.rand_hdr()
|
||||
msg.tn = 10
|
||||
assert(validate_throw(msg))
|
||||
|
||||
log.info("Check incorrect message validation: OK")
|
||||
|
||||
log.info("Encoding the reference messages")
|
||||
|
||||
# Encode DATA messages
|
||||
l12trx_raw = msg_l12trx_ref.gen_msg()
|
||||
trx2l1_raw = msg_trx2l1_ref.gen_msg()
|
||||
|
||||
# Encode a TRX2L1 message in legacy mode
|
||||
trx2l1_raw_legacy = msg_trx2l1_ref.gen_msg(legacy = True)
|
||||
|
||||
log.info("Parsing generated messages back")
|
||||
|
||||
# Parse generated DATA messages
|
||||
msg_l12trx_dec = DATAMSG_L12TRX()
|
||||
msg_trx2l1_dec = DATAMSG_TRX2L1()
|
||||
msg_l12trx_dec.parse_msg(l12trx_raw)
|
||||
msg_trx2l1_dec.parse_msg(trx2l1_raw)
|
||||
|
||||
# Parse generated TRX2L1 message in legacy mode
|
||||
msg_trx2l1_legacy_dec = DATAMSG_TRX2L1()
|
||||
msg_trx2l1_legacy_dec.parse_msg(trx2l1_raw_legacy)
|
||||
|
||||
log.info("Comparing decoded messages with the reference")
|
||||
|
||||
# Compare bursts
|
||||
assert(msg_l12trx_dec.burst == msg_l12trx_ref.burst)
|
||||
assert(msg_trx2l1_dec.burst == msg_trx2l1_ref.burst)
|
||||
assert(msg_trx2l1_legacy_dec.burst == msg_trx2l1_ref.burst)
|
||||
|
||||
log.info("Compare bursts: OK")
|
||||
|
||||
# Compare both parsed messages with the reference data
|
||||
assert(msg_l12trx_dec.fn == msg_l12trx_ref.fn)
|
||||
assert(msg_trx2l1_dec.fn == msg_trx2l1_ref.fn)
|
||||
assert(msg_l12trx_dec.tn == msg_l12trx_ref.tn)
|
||||
assert(msg_trx2l1_dec.tn == msg_trx2l1_ref.tn)
|
||||
|
||||
log.info("Compare FN / TN: OK")
|
||||
|
||||
# Compare message specific parts
|
||||
assert(msg_trx2l1_dec.rssi == msg_trx2l1_ref.rssi)
|
||||
assert(msg_l12trx_dec.pwr == msg_l12trx_ref.pwr)
|
||||
assert(msg_trx2l1_dec.toa256 == msg_trx2l1_ref.toa256)
|
||||
|
||||
log.info("Compare message specific data: OK")
|
||||
|
||||
# Bit conversation test
|
||||
usbits_ref = list(range(0, 256))
|
||||
sbits_ref = list(range(-127, 128))
|
||||
|
||||
# Test both usbit2sbit() and sbit2usbit()
|
||||
sbits = DATAMSG.usbit2sbit(usbits_ref)
|
||||
usbits = DATAMSG.sbit2usbit(sbits)
|
||||
assert(usbits[:255] == usbits_ref[:255])
|
||||
assert(usbits[255] == 254)
|
||||
|
||||
log.info("Check both usbit2sbit() and sbit2usbit(): OK")
|
||||
|
||||
# Test both sbit2ubit() and ubit2sbit()
|
||||
ubits = DATAMSG.sbit2ubit(sbits_ref)
|
||||
assert(ubits == ([1] * 127 + [0] * 128))
|
||||
|
||||
sbits = DATAMSG.ubit2sbit(ubits)
|
||||
assert(sbits == ([-127] * 127 + [127] * 128))
|
||||
|
||||
log.info("Check both sbit2ubit() and ubit2sbit(): OK")
|
||||
|
||||
# Test message transformation
|
||||
msg_l12trx_dec = msg_trx2l1_ref.gen_l12trx()
|
||||
msg_trx2l1_dec = msg_l12trx_ref.gen_trx2l1()
|
||||
|
||||
assert(msg_l12trx_dec.fn == msg_trx2l1_ref.fn)
|
||||
assert(msg_l12trx_dec.tn == msg_trx2l1_ref.tn)
|
||||
|
||||
assert(msg_trx2l1_dec.fn == msg_l12trx_ref.fn)
|
||||
assert(msg_trx2l1_dec.tn == msg_l12trx_ref.tn)
|
||||
|
||||
assert(msg_l12trx_dec.burst == DATAMSG.sbit2ubit(msg_trx2l1_ref.burst))
|
||||
assert(msg_trx2l1_dec.burst == DATAMSG.ubit2sbit(msg_l12trx_ref.burst))
|
||||
|
||||
log.info("Check L12TRX <-> TRX2L1 type transformations: OK")
|
||||
|
||||
# Test header version coding
|
||||
for ver in DATAMSG.known_versions:
|
||||
# Create messages of both types
|
||||
msg_l12trx = DATAMSG_L12TRX(ver = ver)
|
||||
msg_trx2l1 = DATAMSG_TRX2L1(ver = ver)
|
||||
|
||||
# Randomize message specific headers
|
||||
msg_l12trx.rand_hdr()
|
||||
msg_trx2l1.rand_hdr()
|
||||
|
||||
# Randomize bursts
|
||||
msg_l12trx.rand_burst()
|
||||
msg_trx2l1.rand_burst()
|
||||
|
||||
# Encode DATA messages
|
||||
msg_l12trx_enc = msg_l12trx.gen_msg()
|
||||
msg_trx2l1_enc = msg_trx2l1.gen_msg()
|
||||
|
||||
# Parse generated DATA messages
|
||||
msg_l12trx_dec = DATAMSG_L12TRX()
|
||||
msg_trx2l1_dec = DATAMSG_TRX2L1()
|
||||
msg_l12trx_dec.parse_msg(msg_l12trx_enc)
|
||||
msg_trx2l1_dec.parse_msg(msg_trx2l1_enc)
|
||||
|
||||
# Match the header version
|
||||
assert(msg_l12trx_dec.ver == ver)
|
||||
assert(msg_trx2l1_dec.ver == ver)
|
||||
|
||||
# Match common TDMA fields
|
||||
assert(msg_l12trx_dec.tn == msg_l12trx.tn)
|
||||
assert(msg_trx2l1_dec.fn == msg_trx2l1.fn)
|
||||
|
||||
# Match version specific fields
|
||||
if msg_trx2l1.ver >= 0x01:
|
||||
assert(msg_trx2l1_dec.nope_ind == msg_trx2l1.nope_ind)
|
||||
assert(msg_trx2l1_dec.mod_type == msg_trx2l1.mod_type)
|
||||
assert(msg_trx2l1_dec.tsc_set == msg_trx2l1.tsc_set)
|
||||
assert(msg_trx2l1_dec.tsc == msg_trx2l1.tsc)
|
||||
assert(msg_trx2l1_dec.ci == msg_trx2l1.ci)
|
||||
|
||||
log.info("Check header version %u coding: OK" % ver)
|
||||
|
||||
# Compare bursts
|
||||
assert(msg_l12trx_dec.burst == msg_l12trx.burst)
|
||||
assert(msg_trx2l1_dec.burst == msg_trx2l1.burst)
|
||||
|
||||
msg_trx2l1_gen = msg_l12trx.gen_trx2l1()
|
||||
msg_l12trx_gen = msg_trx2l1.gen_l12trx()
|
||||
|
||||
assert(msg_trx2l1_gen is not None)
|
||||
assert(msg_l12trx_gen is not None)
|
||||
|
||||
# Match the header version
|
||||
assert(msg_trx2l1_gen.ver == ver)
|
||||
assert(msg_l12trx_gen.ver == ver)
|
||||
|
||||
# Match common TDMA fields
|
||||
assert(msg_trx2l1_gen.tn == msg_l12trx.tn)
|
||||
assert(msg_l12trx_gen.fn == msg_trx2l1.fn)
|
||||
|
||||
log.info("Verify version %u direct transformation: OK" % ver)
|
||||
|
||||
# Verify NOPE indication coding
|
||||
if msg_trx2l1.ver >= 0x01:
|
||||
msg_trx2l1 = DATAMSG_TRX2L1(ver = ver)
|
||||
msg_trx2l1.nope_ind = True
|
||||
msg_trx2l1.rand_hdr()
|
||||
|
||||
msg_trx2l1_dec = DATAMSG_TRX2L1()
|
||||
msg_trx2l1_dec.parse_msg(msg_trx2l1.gen_msg())
|
||||
|
||||
assert(msg_trx2l1.nope_ind == msg_trx2l1_dec.nope_ind)
|
||||
assert(msg_trx2l1.burst == msg_trx2l1_dec.burst)
|
||||
|
||||
log.info("Verify version %u NOPE indication coding: OK" % ver)
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# TRX Toolkit
|
||||
# Unit test for TRXD message codec
|
||||
#
|
||||
# (C) 2019 by Vadim Yanitskiy <axilirator@gmail.com>
|
||||
#
|
||||
# All Rights Reserved
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along
|
||||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
import unittest
|
||||
|
||||
from data_msg import DATAMSG, DATAMSG_L12TRX, DATAMSG_TRX2L1
|
||||
|
||||
class DATAMSG_Test(unittest.TestCase):
|
||||
# Compare message a with message b
|
||||
def _compare_msg(self, a, b):
|
||||
# Make sure we're comparing messages of the same type
|
||||
self.assertEqual(a.__class__, b.__class__)
|
||||
|
||||
# Compare common header fields
|
||||
self.assertEqual(a.ver, b.ver)
|
||||
self.assertEqual(a.fn, b.fn)
|
||||
self.assertEqual(a.tn, b.tn)
|
||||
|
||||
# Burst bits (if present)
|
||||
self.assertEqual(a.burst, b.burst)
|
||||
|
||||
# TRX2L1 specific fields
|
||||
if isinstance(a, DATAMSG_L12TRX):
|
||||
self.assertEqual(a.pwr, b.pwr)
|
||||
|
||||
# L12TRX specific fields
|
||||
if isinstance(a, DATAMSG_TRX2L1):
|
||||
# Version independent fields
|
||||
self.assertEqual(a.toa256, b.toa256)
|
||||
self.assertEqual(a.rssi, b.rssi)
|
||||
|
||||
# Version specific fields
|
||||
if a.ver >= 1:
|
||||
self.assertEqual(a.nope_ind, b.nope_ind)
|
||||
self.assertEqual(a.mod_type, b.mod_type)
|
||||
self.assertEqual(a.tsc_set, b.tsc_set)
|
||||
self.assertEqual(a.tsc, b.tsc)
|
||||
self.assertEqual(a.ci, b.ci)
|
||||
|
||||
# Make sure that message validation throws a ValueError
|
||||
def test_validate(self):
|
||||
# Unknown version
|
||||
with self.assertRaises(ValueError):
|
||||
msg = DATAMSG(fn = 0, tn = 0, ver = 100)
|
||||
msg.validate()
|
||||
|
||||
# Uninitialized field
|
||||
with self.assertRaises(ValueError):
|
||||
msg = DATAMSG()
|
||||
msg.validate()
|
||||
with self.assertRaises(ValueError):
|
||||
msg = DATAMSG(fn = None, tn = 0)
|
||||
msg.validate()
|
||||
|
||||
# Out-of-range value(s)
|
||||
with self.assertRaises(ValueError):
|
||||
msg = DATAMSG(fn = -1, tn = 0)
|
||||
msg.validate()
|
||||
with self.assertRaises(ValueError):
|
||||
msg = DATAMSG(fn = 0, tn = 10)
|
||||
msg.validate()
|
||||
|
||||
# Validate header and burst randomization
|
||||
def test_rand_hdr_burst(self):
|
||||
msg_l12trx = DATAMSG_L12TRX()
|
||||
msg_trx2l1 = DATAMSG_TRX2L1()
|
||||
|
||||
for i in range(100):
|
||||
msg_l12trx.rand_burst()
|
||||
msg_trx2l1.rand_burst()
|
||||
msg_l12trx.rand_hdr()
|
||||
msg_trx2l1.rand_hdr()
|
||||
|
||||
msg_l12trx.validate()
|
||||
msg_trx2l1.validate()
|
||||
|
||||
def _test_enc_dec(self, msg, legacy = False, nope_ind = False):
|
||||
# Prepare a given message (randomize)
|
||||
msg.rand_hdr()
|
||||
|
||||
# NOPE.ind contains no burst
|
||||
if not nope_ind:
|
||||
msg.rand_burst()
|
||||
else:
|
||||
msg.nope_ind = True
|
||||
msg.mod_type = None
|
||||
msg.tsc_set = None
|
||||
msg.tsc = None
|
||||
|
||||
# Encode a given message to bytes
|
||||
msg_enc = msg.gen_msg(legacy)
|
||||
|
||||
# Decode a new message from bytes
|
||||
msg_dec = msg.__class__()
|
||||
msg_dec.parse_msg(msg_enc)
|
||||
|
||||
# Compare decoded vs the original
|
||||
self._compare_msg(msg, msg_dec)
|
||||
|
||||
# Validate encoding and decoding
|
||||
def test_enc_dec(self):
|
||||
for ver in DATAMSG.known_versions:
|
||||
with self.subTest("L1 -> TRX message", ver = ver):
|
||||
msg = DATAMSG_L12TRX(ver = ver)
|
||||
self._test_enc_dec(msg)
|
||||
|
||||
with self.subTest("TRX -> L1 message", ver = ver):
|
||||
msg = DATAMSG_TRX2L1(ver = ver)
|
||||
self._test_enc_dec(msg)
|
||||
|
||||
if ver >= 1:
|
||||
with self.subTest("TRX -> L1 NOPE.ind", ver = ver):
|
||||
msg = DATAMSG_TRX2L1(ver = ver)
|
||||
self._test_enc_dec(msg, nope_ind = True)
|
||||
|
||||
with self.subTest("TRX -> L1 message (legacy)"):
|
||||
msg = DATAMSG_TRX2L1(ver = 0)
|
||||
self._test_enc_dec(msg, legacy = True)
|
||||
|
||||
# Validate bit conversations
|
||||
def test_bit_conv(self):
|
||||
usbits_ref = list(range(0, 256))
|
||||
sbits_ref = list(range(-127, 128))
|
||||
|
||||
# Test both usbit2sbit() and sbit2usbit()
|
||||
sbits = DATAMSG.usbit2sbit(usbits_ref)
|
||||
usbits = DATAMSG.sbit2usbit(sbits)
|
||||
self.assertEqual(usbits[:255], usbits_ref[:255])
|
||||
self.assertEqual(usbits[255], 254)
|
||||
|
||||
# Test both sbit2ubit() and ubit2sbit()
|
||||
ubits = DATAMSG.sbit2ubit(sbits_ref)
|
||||
self.assertEqual(ubits, ([1] * 127 + [0] * 128))
|
||||
|
||||
sbits = DATAMSG.ubit2sbit(ubits)
|
||||
self.assertEqual(sbits, ([-127] * 127 + [127] * 128))
|
||||
|
||||
def _test_transform(self, msg):
|
||||
# Prepare given messages
|
||||
msg.rand_hdr()
|
||||
msg.rand_burst()
|
||||
|
||||
# Perform message transformation
|
||||
if isinstance(msg, DATAMSG_L12TRX):
|
||||
msg_trans = msg.gen_trx2l1()
|
||||
else:
|
||||
msg_trans = msg.gen_l12trx()
|
||||
|
||||
self.assertEqual(msg_trans.ver, msg.ver)
|
||||
self.assertEqual(msg_trans.fn, msg.fn)
|
||||
self.assertEqual(msg_trans.tn, msg.tn)
|
||||
|
||||
if isinstance(msg, DATAMSG_TRX2L1):
|
||||
burst = DATAMSG.sbit2ubit(msg.burst)
|
||||
self.assertEqual(msg_trans.burst, burst)
|
||||
else:
|
||||
burst = DATAMSG.ubit2sbit(msg.burst)
|
||||
self.assertEqual(msg_trans.burst, burst)
|
||||
|
||||
# Validate message transformation
|
||||
def test_transform(self):
|
||||
for ver in DATAMSG.known_versions:
|
||||
with self.subTest("L1 -> TRX message", ver = ver):
|
||||
msg = DATAMSG_L12TRX(ver = ver)
|
||||
self._test_transform(msg)
|
||||
|
||||
with self.subTest("TRX -> L1 message", ver = ver):
|
||||
msg = DATAMSG_TRX2L1(ver = ver)
|
||||
self._test_transform(msg)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue