fake_trx: implement classes for DATA capture menagement
This change introduces the following classes: - DATADump - basic class, which contains methods to generate and parse the a message header, and some constants. - DATADumpFile - a child class, which contains methods to write and parse DATA messages from capture files. Usage example: # Open a capture file ddf = DATADumpFile("capture.bin") # Parse the 10th message msg = ddf.parse_msg(10) msg.fn = 100 msg.tn = 0 # Append one to the end of the capture ddf.append_msg(msg) Change-Id: I1b31183bd7bcca94de089847ee0b2f4ec88a7f1d
This commit is contained in:
parent
041bfc0b03
commit
afd110a3b5
|
@ -0,0 +1,379 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# TRX Toolkit
|
||||
# Helpers for DATA capture management
|
||||
#
|
||||
# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
|
||||
#
|
||||
# All Rights Reserved
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along
|
||||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
import struct
|
||||
|
||||
from data_msg import *
|
||||
|
||||
class DATADump:
|
||||
# Constants
|
||||
TAG_L12TRX = '\x01'
|
||||
TAG_TRX2L1 = '\x02'
|
||||
HDR_LENGTH = 2
|
||||
|
||||
# Generates raw bytes from a DATA message
|
||||
# Return value: raw message bytes
|
||||
def dump_msg(self, msg):
|
||||
# Determine a message type
|
||||
if isinstance(msg, DATAMSG_L12TRX):
|
||||
tag = self.TAG_L12TRX
|
||||
elif isinstance(msg, DATAMSG_TRX2L1):
|
||||
tag = self.TAG_TRX2L1
|
||||
else:
|
||||
raise ValueError("Unknown message type")
|
||||
|
||||
# Generate a message payload
|
||||
msg_raw = msg.gen_msg()
|
||||
|
||||
# Calculate the length
|
||||
msg_len = len(msg_raw)
|
||||
|
||||
# Concatenate a message with header
|
||||
return bytearray([tag, msg_len]) + msg_raw
|
||||
|
||||
def parse_hdr(self, hdr):
|
||||
# Extract the header info
|
||||
msg_len = struct.unpack("<B", hdr[1])[0]
|
||||
tag = hdr[0]
|
||||
|
||||
# Check if tag is known
|
||||
if tag == self.TAG_L12TRX:
|
||||
# L1 -> TRX
|
||||
msg = DATAMSG_L12TRX()
|
||||
elif tag == self.TAG_TRX2L1:
|
||||
# TRX -> L1
|
||||
msg = DATAMSG_TRX2L1()
|
||||
else:
|
||||
# Unknown tag
|
||||
return False
|
||||
|
||||
return (msg, msg_len)
|
||||
|
||||
class DATADumpFile(DATADump):
|
||||
def __init__(self, capture):
|
||||
# Check if capture file is already opened
|
||||
if type(capture) is file:
|
||||
self.f = capture
|
||||
else:
|
||||
print("[i] Opening capture file '%s'..." % capture)
|
||||
self.f = open(capture, "a+b")
|
||||
|
||||
def __del__(self):
|
||||
print("[i] Closing the capture file")
|
||||
self.f.close()
|
||||
|
||||
# Moves the file descriptor before a specified message
|
||||
# Return value:
|
||||
# True in case of success,
|
||||
# or False in case of EOF or header parsing error.
|
||||
def _seek2msg(self, idx):
|
||||
# Seek to the begining of the capture
|
||||
self.f.seek(0)
|
||||
|
||||
# Read the capture in loop...
|
||||
for i in range(idx):
|
||||
# Attempt to read a message header
|
||||
hdr_raw = self.f.read(self.HDR_LENGTH)
|
||||
if len(hdr_raw) != self.HDR_LENGTH:
|
||||
return False
|
||||
|
||||
# Attempt to parse it
|
||||
rc = self.parse_hdr(hdr_raw)
|
||||
if rc is False:
|
||||
print("[!] Couldn't parse a message header")
|
||||
return False
|
||||
|
||||
# Expand the header
|
||||
(_, msg_len) = rc
|
||||
|
||||
# Skip a message
|
||||
self.f.seek(msg_len, 1)
|
||||
|
||||
return True
|
||||
|
||||
# Parses a single message at the current descriptor position
|
||||
# Return value:
|
||||
# a parsed message in case of success,
|
||||
# or None in case of EOF or header parsing error,
|
||||
# or False in case of message parsing error.
|
||||
def _parse_msg(self):
|
||||
# Attempt to read a message header
|
||||
hdr_raw = self.f.read(self.HDR_LENGTH)
|
||||
if len(hdr_raw) != self.HDR_LENGTH:
|
||||
return None
|
||||
|
||||
# Attempt to parse it
|
||||
rc = self.parse_hdr(hdr_raw)
|
||||
if rc is False:
|
||||
print("[!] Couldn't parse a message header")
|
||||
return None
|
||||
|
||||
# Expand the header
|
||||
(msg, msg_len) = rc
|
||||
|
||||
# Attempt to read a message
|
||||
msg_raw = self.f.read(msg_len)
|
||||
if len(msg_raw) != msg_len:
|
||||
print("[!] Message length mismatch")
|
||||
return None
|
||||
|
||||
# Attempt to parse a message
|
||||
try:
|
||||
msg_raw = bytearray(msg_raw)
|
||||
msg.parse_msg(msg_raw)
|
||||
except:
|
||||
print("[!] Couldn't parse a message, skipping...")
|
||||
return False
|
||||
|
||||
# Success
|
||||
return msg
|
||||
|
||||
# Parses a particular message defined by index idx
|
||||
# Return value:
|
||||
# a parsed message in case of success,
|
||||
# or None in case of EOF or header parsing error,
|
||||
# or False in case of message parsing error or out of range.
|
||||
def parse_msg(self, idx):
|
||||
# Move descriptor to the begining of requested message
|
||||
rc = self._seek2msg(idx)
|
||||
if not rc:
|
||||
print("[!] Couldn't find requested message")
|
||||
return False
|
||||
|
||||
# Attempt to parse a message
|
||||
return self._parse_msg()
|
||||
|
||||
# Parses all messages from a given file
|
||||
# Return value:
|
||||
# list of parsed messages,
|
||||
# or False in case of range error.
|
||||
def parse_all(self, skip = None, count = None):
|
||||
result = []
|
||||
|
||||
# Should we skip some messages?
|
||||
if skip is None:
|
||||
# Seek to the begining of the capture
|
||||
self.f.seek(0)
|
||||
else:
|
||||
rc = self._seek2msg(skip)
|
||||
if not rc:
|
||||
print("[!] Couldn't find requested message")
|
||||
return False
|
||||
|
||||
# Read the capture in loop...
|
||||
while True:
|
||||
# Attempt to parse a message
|
||||
msg = self._parse_msg()
|
||||
|
||||
# EOF or broken header
|
||||
if msg is None:
|
||||
break
|
||||
|
||||
# Skip unparsed messages
|
||||
if msg is False:
|
||||
continue
|
||||
|
||||
# Success, append a message
|
||||
result.append(msg)
|
||||
|
||||
# Count limitation
|
||||
if count is not None:
|
||||
if len(result) == count:
|
||||
break
|
||||
|
||||
return result
|
||||
|
||||
# Writes a new message at the end of the capture
|
||||
def append_msg(self, msg):
|
||||
# Generate raw bytes and write
|
||||
msg_raw = self.dump_msg(msg)
|
||||
self.f.write(msg_raw)
|
||||
|
||||
# Writes a list of messages at the end of the capture
|
||||
def append_all(self, msgs):
|
||||
for msg in msgs:
|
||||
self.append_msg(msg)
|
||||
|
||||
# Regression tests
|
||||
if __name__ == '__main__':
|
||||
from tempfile import TemporaryFile
|
||||
from gsm_shared import *
|
||||
import random
|
||||
|
||||
# Create a temporary file
|
||||
tf = TemporaryFile()
|
||||
|
||||
# Create an instance of DATA dump manager
|
||||
ddf = DATADumpFile(tf)
|
||||
|
||||
# Generate two random bursts
|
||||
burst_l12trx = []
|
||||
burst_trx2l1 = []
|
||||
|
||||
for i in range(0, GSM_BURST_LEN):
|
||||
ubit = random.randint(0, 1)
|
||||
burst_l12trx.append(ubit)
|
||||
|
||||
sbit = random.randint(-127, 127)
|
||||
burst_trx2l1.append(sbit)
|
||||
|
||||
# Generate a basic list of random messages
|
||||
print("[i] Generating the reference messages")
|
||||
messages_ref = []
|
||||
|
||||
for i in range(100):
|
||||
# Create a message
|
||||
if i % 2:
|
||||
msg = DATAMSG_L12TRX()
|
||||
msg.burst = burst_l12trx
|
||||
else:
|
||||
msg = DATAMSG_TRX2L1()
|
||||
msg.burst = burst_trx2l1
|
||||
|
||||
# Randomize the header
|
||||
msg.rand_hdr()
|
||||
|
||||
# HACK: as ToA parsing is not implemented yet,
|
||||
# we have to use a fixed 0.00 value for now...
|
||||
if isinstance(msg, DATAMSG_TRX2L1):
|
||||
msg.toa = 0.00
|
||||
|
||||
# Append
|
||||
messages_ref.append(msg)
|
||||
|
||||
print("[i] Adding the following messages to the capture:")
|
||||
for msg in messages_ref[:3]:
|
||||
print(" %s: burst_len=%d"
|
||||
% (msg.desc_hdr(), len(msg.burst)))
|
||||
|
||||
# Check single message appending
|
||||
ddf.append_msg(messages_ref[0])
|
||||
ddf.append_msg(messages_ref[1])
|
||||
ddf.append_msg(messages_ref[2])
|
||||
|
||||
# Read the written messages back
|
||||
messages_check = ddf.parse_all()
|
||||
|
||||
print("[i] Read the following messages back:")
|
||||
for msg in messages_check:
|
||||
print(" %s: burst_len=%d"
|
||||
% (msg.desc_hdr(), len(msg.burst)))
|
||||
|
||||
# Expecting three messages
|
||||
assert(len(messages_check) == 3)
|
||||
|
||||
# Check the messages
|
||||
for i in range(3):
|
||||
# Compare common header parts and bursts
|
||||
assert(messages_check[i].burst == messages_ref[i].burst)
|
||||
assert(messages_check[i].fn == messages_ref[i].fn)
|
||||
assert(messages_check[i].tn == messages_ref[i].tn)
|
||||
|
||||
# HACK: as ToA parsing is not implemented yet,
|
||||
# we have to use a fixed 0.00 value for now...
|
||||
messages_check[i].toa = 0.00
|
||||
|
||||
# Validate a message
|
||||
assert(messages_check[i].validate())
|
||||
|
||||
print("[?] Check append_msg(): OK")
|
||||
|
||||
|
||||
# Append the pending reference messages
|
||||
ddf.append_all(messages_ref[3:])
|
||||
|
||||
# Read the written messages back
|
||||
messages_check = ddf.parse_all()
|
||||
|
||||
# Check the final amount
|
||||
assert(len(messages_check) == len(messages_ref))
|
||||
|
||||
# Check the messages
|
||||
for i in range(len(messages_check)):
|
||||
# Compare common header parts and bursts
|
||||
assert(messages_check[i].burst == messages_ref[i].burst)
|
||||
assert(messages_check[i].fn == messages_ref[i].fn)
|
||||
assert(messages_check[i].tn == messages_ref[i].tn)
|
||||
|
||||
# HACK: as ToA parsing is not implemented yet,
|
||||
# we have to use a fixed 0.00 value for now...
|
||||
messages_check[i].toa = 0.00
|
||||
|
||||
# Validate a message
|
||||
assert(messages_check[i].validate())
|
||||
|
||||
print("[?] Check append_all(): OK")
|
||||
|
||||
|
||||
# Check parse_msg()
|
||||
msg0 = ddf.parse_msg(0)
|
||||
msg10 = ddf.parse_msg(10)
|
||||
|
||||
# Make sure parsing was successful
|
||||
assert(msg0 and msg10)
|
||||
|
||||
# Compare common header parts and bursts
|
||||
assert(msg0.burst == messages_ref[0].burst)
|
||||
assert(msg0.fn == messages_ref[0].fn)
|
||||
assert(msg0.tn == messages_ref[0].tn)
|
||||
|
||||
assert(msg10.burst == messages_ref[10].burst)
|
||||
assert(msg10.fn == messages_ref[10].fn)
|
||||
assert(msg10.tn == messages_ref[10].tn)
|
||||
|
||||
# HACK: as ToA parsing is not implemented yet,
|
||||
# we have to use a fixed 0.00 value for now...
|
||||
msg0.toa = 0.00
|
||||
msg10.toa = 0.00
|
||||
|
||||
# Validate both messages
|
||||
assert(msg0.validate())
|
||||
assert(msg10.validate())
|
||||
|
||||
print("[?] Check parse_msg(): OK")
|
||||
|
||||
|
||||
# Check parse_all() with range
|
||||
messages_check = ddf.parse_all(skip = 10, count = 20)
|
||||
|
||||
# Make sure parsing was successful
|
||||
assert(messages_check)
|
||||
|
||||
# Check the amount
|
||||
assert(len(messages_check) == 20)
|
||||
|
||||
for i in range(20):
|
||||
# Compare common header parts and bursts
|
||||
assert(messages_check[i].burst == messages_ref[i + 10].burst)
|
||||
assert(messages_check[i].fn == messages_ref[i + 10].fn)
|
||||
assert(messages_check[i].tn == messages_ref[i + 10].tn)
|
||||
|
||||
# HACK: as ToA parsing is not implemented yet,
|
||||
# we have to use a fixed 0.00 value for now...
|
||||
messages_check[i].toa = 0.00
|
||||
|
||||
# Validate a message
|
||||
assert(messages_check[i].validate())
|
||||
|
||||
print("[?] Check parse_all(): OK")
|
Loading…
Reference in New Issue