trx_toolkit/data_dump.py: rewrite unit tests to use unittest framework
Change-Id: I8b934c15ba96d856aa79d10bf296d1446f043dd1
This commit is contained in:
parent
5cee398277
commit
7a49fc31c1
|
@ -220,148 +220,3 @@ class DATADumpFile(DATADump):
|
|||
def append_all(self, msgs):
|
||||
for msg in msgs:
|
||||
self.append_msg(msg)
|
||||
|
||||
# Regression tests
|
||||
if __name__ == '__main__':
|
||||
from tempfile import TemporaryFile
|
||||
from gsm_shared import *
|
||||
import random
|
||||
|
||||
# Configure logging
|
||||
log.basicConfig(level = log.DEBUG,
|
||||
format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
|
||||
|
||||
# Create a temporary file
|
||||
tf = TemporaryFile()
|
||||
|
||||
# Create an instance of DATA dump manager
|
||||
ddf = DATADumpFile(tf)
|
||||
|
||||
# Generate two random bursts
|
||||
burst_l12trx = []
|
||||
burst_trx2l1 = []
|
||||
|
||||
for i in range(0, GSM_BURST_LEN):
|
||||
ubit = random.randint(0, 1)
|
||||
burst_l12trx.append(ubit)
|
||||
|
||||
sbit = random.randint(-127, 127)
|
||||
burst_trx2l1.append(sbit)
|
||||
|
||||
# Generate a basic list of random messages
|
||||
log.info("Generating the reference messages")
|
||||
messages_ref = []
|
||||
|
||||
for i in range(100):
|
||||
# Create a message
|
||||
if i % 2:
|
||||
msg = DATAMSG_L12TRX()
|
||||
msg.burst = burst_l12trx
|
||||
else:
|
||||
msg = DATAMSG_TRX2L1()
|
||||
msg.burst = burst_trx2l1
|
||||
|
||||
# Randomize the header
|
||||
msg.rand_hdr()
|
||||
|
||||
# Append
|
||||
messages_ref.append(msg)
|
||||
|
||||
log.info("Adding the following messages to the capture:")
|
||||
for msg in messages_ref[:3]:
|
||||
log.info("%s: burst_len=%d"
|
||||
% (msg.desc_hdr(), len(msg.burst)))
|
||||
|
||||
# Check single message appending
|
||||
ddf.append_msg(messages_ref[0])
|
||||
ddf.append_msg(messages_ref[1])
|
||||
ddf.append_msg(messages_ref[2])
|
||||
|
||||
# Read the written messages back
|
||||
messages_check = ddf.parse_all()
|
||||
|
||||
log.info("Read the following messages back:")
|
||||
for msg in messages_check:
|
||||
log.info("%s: burst_len=%d"
|
||||
% (msg.desc_hdr(), len(msg.burst)))
|
||||
|
||||
# Expecting three messages
|
||||
assert(len(messages_check) == 3)
|
||||
|
||||
# Check the messages
|
||||
for i in range(3):
|
||||
# Compare common header parts and bursts
|
||||
assert(messages_check[i].burst == messages_ref[i].burst)
|
||||
assert(messages_check[i].fn == messages_ref[i].fn)
|
||||
assert(messages_check[i].tn == messages_ref[i].tn)
|
||||
|
||||
# Validate a message
|
||||
messages_check[i].validate()
|
||||
|
||||
log.info("Check append_msg(): OK")
|
||||
|
||||
|
||||
# Append the pending reference messages
|
||||
ddf.append_all(messages_ref[3:])
|
||||
|
||||
# Read the written messages back
|
||||
messages_check = ddf.parse_all()
|
||||
|
||||
# Check the final amount
|
||||
assert(len(messages_check) == len(messages_ref))
|
||||
|
||||
# Check the messages
|
||||
for i in range(len(messages_check)):
|
||||
# Compare common header parts and bursts
|
||||
assert(messages_check[i].burst == messages_ref[i].burst)
|
||||
assert(messages_check[i].fn == messages_ref[i].fn)
|
||||
assert(messages_check[i].tn == messages_ref[i].tn)
|
||||
|
||||
# Validate a message
|
||||
messages_check[i].validate()
|
||||
|
||||
log.info("Check append_all(): OK")
|
||||
|
||||
|
||||
# Check parse_msg()
|
||||
msg0 = ddf.parse_msg(0)
|
||||
msg10 = ddf.parse_msg(10)
|
||||
|
||||
# Make sure parsing was successful
|
||||
assert(msg0 and msg10)
|
||||
|
||||
# Compare common header parts and bursts
|
||||
assert(msg0.burst == messages_ref[0].burst)
|
||||
assert(msg0.fn == messages_ref[0].fn)
|
||||
assert(msg0.tn == messages_ref[0].tn)
|
||||
|
||||
assert(msg10.burst == messages_ref[10].burst)
|
||||
assert(msg10.fn == messages_ref[10].fn)
|
||||
assert(msg10.tn == messages_ref[10].tn)
|
||||
|
||||
# Validate both messages
|
||||
msg0.validate()
|
||||
msg10.validate()
|
||||
|
||||
log.info("Check parse_msg(): OK")
|
||||
|
||||
|
||||
# Check parse_all() with range
|
||||
messages_check = ddf.parse_all(skip = 10, count = 20)
|
||||
|
||||
# Make sure parsing was successful
|
||||
assert(messages_check)
|
||||
|
||||
# Check the amount
|
||||
assert(len(messages_check) == 20)
|
||||
|
||||
for i in range(20):
|
||||
# Compare common header parts and bursts
|
||||
assert(messages_check[i].burst == messages_ref[i + 10].burst)
|
||||
assert(messages_check[i].fn == messages_ref[i + 10].fn)
|
||||
assert(messages_check[i].tn == messages_ref[i + 10].tn)
|
||||
|
||||
# Validate a message
|
||||
messages_check[i].validate()
|
||||
|
||||
log.info("Check parse_all(): OK")
|
||||
|
|
|
@ -0,0 +1,165 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# TRX Toolkit
|
||||
# Unit tests for DATA capture management
|
||||
#
|
||||
# (C) 2019 by Vadim Yanitskiy <axilirator@gmail.com>
|
||||
#
|
||||
# All Rights Reserved
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along
|
||||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
import unittest
|
||||
import tempfile
|
||||
import random
|
||||
|
||||
from gsm_shared import *
|
||||
from data_dump import *
|
||||
|
||||
class DATADump_Test(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Create a temporary file
|
||||
self._tf = tempfile.TemporaryFile(mode = 'w+b')
|
||||
|
||||
# Create an instance of DATA dump manager
|
||||
self._ddf = DATADumpFile(self._tf)
|
||||
|
||||
# Compare message a with message b
|
||||
def _compare_msg(self, a, b):
|
||||
# Make sure we're comparing messages of the same type
|
||||
self.assertEqual(a.__class__, b.__class__)
|
||||
|
||||
# Compare common header fields
|
||||
self.assertEqual(a.ver, b.ver)
|
||||
self.assertEqual(a.fn, b.fn)
|
||||
self.assertEqual(a.tn, b.tn)
|
||||
|
||||
# Burst bits (if present)
|
||||
self.assertEqual(a.burst, b.burst)
|
||||
|
||||
# TRX2L1 specific fields
|
||||
if isinstance(a, DATAMSG_L12TRX):
|
||||
self.assertEqual(a.pwr, b.pwr)
|
||||
|
||||
# L12TRX specific fields
|
||||
if isinstance(a, DATAMSG_TRX2L1):
|
||||
# Version independent fields
|
||||
self.assertEqual(a.toa256, b.toa256)
|
||||
self.assertEqual(a.rssi, b.rssi)
|
||||
|
||||
# Version specific fields
|
||||
if a.ver >= 1:
|
||||
self.assertEqual(a.nope_ind, b.nope_ind)
|
||||
self.assertEqual(a.mod_type, b.mod_type)
|
||||
self.assertEqual(a.tsc_set, b.tsc_set)
|
||||
self.assertEqual(a.tsc, b.tsc)
|
||||
self.assertEqual(a.ci, b.ci)
|
||||
|
||||
# Generate a random message of a given type / version
|
||||
def _gen_rand_message(self, cls, ver = 1):
|
||||
msg = cls(ver = ver)
|
||||
msg.rand_hdr()
|
||||
msg.rand_burst()
|
||||
return msg
|
||||
|
||||
# Generate a list of random messages
|
||||
def _gen_rand_messages(self, cls, count, ver = 1):
|
||||
msg_list = []
|
||||
|
||||
for i in range(count):
|
||||
msg = self._gen_rand_message(cls, ver)
|
||||
msg_list.append(msg)
|
||||
|
||||
return msg_list
|
||||
|
||||
# Generate a mixed list of random messages
|
||||
def _gen_rand_message_mix(self, count, ver = 1):
|
||||
msg_list = []
|
||||
msg_list += self._gen_rand_messages(DATAMSG_TRX2L1, count)
|
||||
msg_list += self._gen_rand_messages(DATAMSG_L12TRX, count)
|
||||
random.shuffle(msg_list)
|
||||
return msg_list
|
||||
|
||||
def _test_store_and_parse(self, cls):
|
||||
msg_ref = self._gen_rand_message(cls)
|
||||
self._ddf.append_msg(msg_ref)
|
||||
|
||||
msg = self._ddf.parse_msg(0)
|
||||
self._compare_msg(msg, msg_ref)
|
||||
|
||||
# Store one TRX2L1 message in a file, read it back and compare
|
||||
def test_store_and_parse_trx2l1(self):
|
||||
self._test_store_and_parse(DATAMSG_TRX2L1)
|
||||
|
||||
# Store one L12TRX message in a file, read it back and compare
|
||||
def test_store_and_parse_l12trx(self):
|
||||
self._test_store_and_parse(DATAMSG_L12TRX)
|
||||
|
||||
# Store multiple TRX2L1/L12TRX messages in a file, read them back and compare
|
||||
def test_store_and_parse_all(self):
|
||||
# Store a mixed list of random messages (19 + 19)
|
||||
msg_list_ref = self._gen_rand_message_mix(19)
|
||||
self._ddf.append_all(msg_list_ref)
|
||||
|
||||
# Retrieve and compare stored messages
|
||||
msg_list = self._ddf.parse_all()
|
||||
for i in range(len(msg_list_ref)):
|
||||
self._compare_msg(msg_list[i], msg_list_ref[i])
|
||||
|
||||
# Verify random access to stored messages
|
||||
def test_parse_msg_idx(self):
|
||||
# Store a mixed list of random messages (19 + 19)
|
||||
msg_list_ref = self._gen_rand_message_mix(19)
|
||||
self._ddf.append_all(msg_list_ref)
|
||||
|
||||
# Random access
|
||||
for _ in range(100):
|
||||
idx = random.randrange(len(msg_list_ref))
|
||||
msg = self._ddf.parse_msg(idx)
|
||||
self._compare_msg(msg, msg_list_ref[idx])
|
||||
|
||||
def test_parse_empty(self):
|
||||
with self.assertLogs(level = 'ERROR'):
|
||||
idx = random.randrange(100)
|
||||
msg = self._ddf.parse_msg(idx)
|
||||
self.assertEqual(msg, False)
|
||||
|
||||
def test_parse_all_empty(self):
|
||||
msg_list = self._ddf.parse_all()
|
||||
self.assertEqual(msg_list, [])
|
||||
|
||||
def test_parse_len_overflow(self):
|
||||
# Write a malformed message directly
|
||||
self._tf.write(DATADump.TAG_L12TRX)
|
||||
self._tf.write(b'\x00\x63') # 99
|
||||
self._tf.write(b'\xff' * 90)
|
||||
|
||||
with self.assertLogs(level = 'ERROR'):
|
||||
msg = self._ddf.parse_msg(0)
|
||||
self.assertEqual(msg, None)
|
||||
|
||||
def test_parse_unknown_tag(self):
|
||||
# Write a malformed message directly
|
||||
self._tf.write(b'\x33')
|
||||
self._tf.write(b'\x00\x63') # 99
|
||||
self._tf.write(b'\xff' * 90)
|
||||
|
||||
with self.assertLogs(level = 'ERROR'):
|
||||
msg = self._ddf.parse_msg(0)
|
||||
self.assertEqual(msg, None)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue