trx_toolkit/data_msg.py: convert comments into docstrings

Change-Id: I856b54fd1baca4ae0edd2aa59be6a76372cef667
Related: OS#4006, SYS#4895
This commit is contained in:
Vadim Yanitskiy 2021-02-24 23:26:38 +01:00 committed by fixeria
parent 904d128c4d
commit 7a31e98936
1 changed files with 75 additions and 47 deletions

View File

@ -122,54 +122,53 @@ class DATAMSG:
CHDR_VERSION_MAX = 0b1111
known_versions = [0x00, 0x01]
# Common constructor
def __init__(self, fn = None, tn = None, burst = None, ver = 0):
self.burst = burst
self.ver = ver
self.fn = fn
self.tn = tn
# The common header length
@property
def CHDR_LEN(self):
# (VER + TN) + FN
return 1 + 4
''' The common header length. '''
return 1 + 4 # (VER + TN) + FN
# Generates message specific header
def gen_hdr(self):
''' Generate message specific header. '''
raise NotImplementedError
# Parses message specific header
def parse_hdr(self, hdr):
''' Parse message specific header. '''
raise NotImplementedError
# Generates message specific burst
def gen_burst(self):
''' Generate message specific burst. '''
raise NotImplementedError
# Parses message specific burst
def parse_burst(self, burst):
''' Parse message specific burst. '''
raise NotImplementedError
# Generate a random message specific burst
def rand_burst(self):
''' Generate a random message specific burst. '''
raise NotImplementedError
# Generates a random frame number
def rand_fn(self):
''' Generate a random frame number. '''
return random.randint(0, GSM_HYPERFRAME)
# Generates a random timeslot number
def rand_tn(self):
''' Generate a random timeslot number. '''
return random.randint(0, 7)
# Randomizes the message header
def rand_hdr(self):
''' Randomize the message header. '''
self.fn = self.rand_fn()
self.tn = self.rand_tn()
# Generates human-readable header description
def desc_hdr(self):
''' Generate human-readable header description. '''
result = ""
if self.ver > 0:
@ -186,28 +185,29 @@ class DATAMSG:
return result
# Converts unsigned soft-bits {254..0} to soft-bits {-127..127}
@staticmethod
def usbit2sbit(bits):
''' Convert unsigned soft-bits {254..0} to soft-bits {-127..127}. '''
return [-127 if (b == 0xff) else 127 - b for b in bits]
# Converts soft-bits {-127..127} to unsigned soft-bits {254..0}
@staticmethod
def sbit2usbit(bits):
''' Convert soft-bits {-127..127} to unsigned soft-bits {254..0}. '''
return [127 - b for b in bits]
# Converts soft-bits {-127..127} to bits {1..0}
@staticmethod
def sbit2ubit(bits):
''' Convert soft-bits {-127..127} to bits {1..0}. '''
return [int(b < 0) for b in bits]
# Converts bits {1..0} to soft-bits {-127..127}
@staticmethod
def ubit2sbit(bits):
''' Convert bits {1..0} to soft-bits {-127..127}. '''
return [-127 if b else 127 for b in bits]
# Validates the message fields (throws ValueError)
def validate(self):
''' Validate the message fields (throws ValueError). '''
if not self.ver in self.known_versions:
raise ValueError("Unknown TRXD header version %d" % self.ver)
@ -223,8 +223,9 @@ class DATAMSG:
if self.tn < 0 or self.tn > 7:
raise ValueError("TDMA time-slot %d is out of range" % self.tn)
# Generates a TRX DATA message
def gen_msg(self, legacy = False):
''' Generate a TRX DATA message. '''
# Validate all the fields
self.validate()
@ -252,8 +253,9 @@ class DATAMSG:
return buf
# Parses a TRX DATA message
def parse_msg(self, msg):
''' Parse a TRX DATA message. '''
# Make sure we have at least common header
if len(msg) < self.CHDR_LEN:
raise ValueError("Message is to short: missing common header")
@ -311,9 +313,10 @@ class DATAMSG_L12TRX(DATAMSG):
# Specific message fields
pwr = None
# Calculates header length depending on its version
@property
def HDR_LEN(self):
''' Calculate header length depending on its version. '''
# Common header length
length = self.CHDR_LEN
@ -325,8 +328,9 @@ class DATAMSG_L12TRX(DATAMSG):
return length
# Validates the message fields (throws ValueError)
def validate(self):
''' Validate the message fields (throws ValueError). '''
# Validate common fields
DATAMSG.validate(self)
@ -344,8 +348,9 @@ class DATAMSG_L12TRX(DATAMSG):
if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN):
raise ValueError("Tx burst has odd length %u" % len(self.burst))
# Generates a random power level
def rand_pwr(self, min = None, max = None):
''' Generate a random power level. '''
if min is None:
min = self.PWR_MIN
@ -354,13 +359,15 @@ class DATAMSG_L12TRX(DATAMSG):
return random.randint(min, max)
# Randomizes message specific header
def rand_hdr(self):
''' Randomize message specific header. '''
DATAMSG.rand_hdr(self)
self.pwr = self.rand_pwr()
# Generates human-readable header description
def desc_hdr(self):
''' Generate human-readable header description. '''
# Describe the common part
result = DATAMSG.desc_hdr(self)
@ -370,8 +377,9 @@ class DATAMSG_L12TRX(DATAMSG):
# Strip useless whitespace and return
return result.strip()
# Generates message specific header part
def gen_hdr(self):
''' Generate message specific header part. '''
# Allocate an empty byte-array
buf = bytearray()
@ -380,18 +388,21 @@ class DATAMSG_L12TRX(DATAMSG):
return buf
# Parses message specific header part
def parse_hdr(self, hdr):
''' Parse message specific header part. '''
# Parse power level
self.pwr = hdr[5]
# Generates message specific burst
def gen_burst(self):
''' Generate message specific burst. '''
# Copy burst 'as is'
return bytearray(self.burst)
# Parses message specific burst
def parse_burst(self, burst):
''' Parse message specific burst. '''
length = len(burst)
# Distinguish between GSM and EDGE
@ -400,12 +411,13 @@ class DATAMSG_L12TRX(DATAMSG):
else:
self.burst = list(burst[:GSM_BURST_LEN])
# Generate a random message specific burst
def rand_burst(self, length = GSM_BURST_LEN):
''' Generate a random message specific burst. '''
self.burst = [random.randint(0, 1) for _ in range(length)]
# Transforms this message to TRX2L1 message
def gen_trx2l1(self, ver = None):
''' Transform this message to TRX2L1 message. '''
# Allocate a new message
msg = DATAMSG_TRX2L1(fn = self.fn, tn = self.tn,
ver = self.ver if ver is None else ver)
@ -539,9 +551,10 @@ class DATAMSG_TRX2L1(DATAMSG):
tsc = None
ci = None
# Calculates header length depending on its version
@property
def HDR_LEN(self):
''' Calculate header length depending on its version. '''
# Common header length
length = self.CHDR_LEN
@ -580,15 +593,17 @@ class DATAMSG_TRX2L1(DATAMSG):
if len(self.burst) != self.mod_type.bl:
raise ValueError("Rx burst has odd length %u" % len(self.burst))
# Validates the burst (throws ValueError)
def validate_burst(self):
''' Validate the burst (throws ValueError). '''
if self.ver == 0x00:
self._validate_burst_v0()
elif self.ver >= 0x01:
self._validate_burst_v1()
# Validates the message header fields (throws ValueError)
def validate(self):
''' Validate the message header fields (throws ValueError). '''
# Validate common fields
DATAMSG.validate(self)
@ -635,8 +650,9 @@ class DATAMSG_TRX2L1(DATAMSG):
self.validate_burst()
# Generates a random RSSI value
def rand_rssi(self, min = None, max = None):
''' Generate a random RSSI value. '''
if min is None:
min = self.RSSI_MIN
@ -645,8 +661,9 @@ class DATAMSG_TRX2L1(DATAMSG):
return random.randint(min, max)
# Generates a ToA (Time of Arrival) value
def rand_toa256(self, min = None, max = None):
''' Generate a random ToA (Time of Arrival) value. '''
if min is None:
min = self.TOA256_MIN
@ -655,8 +672,9 @@ class DATAMSG_TRX2L1(DATAMSG):
return random.randint(min, max)
# Randomizes message specific header
def rand_hdr(self):
''' Randomize message specific header. '''
DATAMSG.rand_hdr(self)
self.rssi = self.rand_rssi()
self.toa256 = self.rand_toa256()
@ -672,8 +690,9 @@ class DATAMSG_TRX2L1(DATAMSG):
# C/I: Carrier-to-Interference ratio
self.ci = random.randint(self.CI_MIN, self.CI_MAX)
# Generates human-readable header description
def desc_hdr(self):
''' Generate human-readable header description. '''
# Describe the common part
result = DATAMSG.desc_hdr(self)
@ -699,8 +718,9 @@ class DATAMSG_TRX2L1(DATAMSG):
# Strip useless whitespace and return
return result.strip()
# Encodes Modulation and Training Sequence info
def gen_mts(self):
''' Encode Modulation and Training Sequence info. '''
# IDLE / nope indication has no MTS info
if self.nope_ind:
return self.NOPE_IND
@ -714,8 +734,9 @@ class DATAMSG_TRX2L1(DATAMSG):
return mts
# Parses Modulation and Training Sequence info
def parse_mts(self, mts):
''' Parse Modulation and Training Sequence info. '''
# IDLE / nope indication has no MTS info
self.nope_ind = (mts & self.NOPE_IND) > 0
if self.nope_ind:
@ -738,8 +759,9 @@ class DATAMSG_TRX2L1(DATAMSG):
self.mod_type = Modulation.ModGMSK
self.tsc_set = mts & 0b11
# Generates message specific header part
def gen_hdr(self):
''' Generate message specific header part. '''
# Allocate an empty byte-array
buf = bytearray()
@ -760,8 +782,9 @@ class DATAMSG_TRX2L1(DATAMSG):
return buf
# Parses message specific header part
def parse_hdr(self, hdr):
''' Parse message specific header part. '''
# Parse RSSI
self.rssi = -(hdr[5])
@ -775,16 +798,18 @@ class DATAMSG_TRX2L1(DATAMSG):
# C/I: Carrier-to-Interference ratio (in centiBels)
self.ci = struct.unpack(">h", hdr[9:11])[0]
# Generates message specific burst
def gen_burst(self):
''' Generate message specific burst. '''
# Convert soft-bits to unsigned soft-bits
burst_usbits = self.sbit2usbit(self.burst)
# Encode to bytes
return bytearray(burst_usbits)
# Parses message specific burst for header version 0
def _parse_burst_v0(self, burst):
''' Parse message specific burst for header version 0. '''
bl = len(burst)
# We need to guess modulation by the length of burst
@ -798,8 +823,9 @@ class DATAMSG_TRX2L1(DATAMSG):
return burst[:self.mod_type.bl]
# Parses message specific burst
def parse_burst(self, burst):
''' Parse message specific burst. '''
burst = list(burst)
if self.ver == 0x00:
@ -808,15 +834,17 @@ class DATAMSG_TRX2L1(DATAMSG):
# Convert unsigned soft-bits to soft-bits
self.burst = self.usbit2sbit(burst)
# Generate a random message specific burst
def rand_burst(self, length = None):
''' Generate a random message specific burst. '''
if length is None:
length = self.mod_type.bl
self.burst = [random.randint(-127, 127) for _ in range(length)]
# Transforms this message to L12TRX message
def gen_l12trx(self, ver = None):
''' Transform this message to L12TRX message. '''
# Allocate a new message
msg = DATAMSG_L12TRX(fn = self.fn, tn = self.tn,
ver = self.ver if ver is None else ver)