trx_toolkit/data_msg.py: raise exceptions from validate() methods

Raising exceptions is a Pythonic way to handle errors, which in this
particular case will help us to know *why* exactly a given message
is incorrect or incomplete.

Change-Id: Ia961f83c717066af61699c80536468392b8ce064
This commit is contained in:
Vadim Yanitskiy 2019-08-27 20:24:19 +02:00
parent ad84c7b2f7
commit 6af5d8da2c
3 changed files with 74 additions and 61 deletions

View File

@ -296,7 +296,7 @@ if __name__ == '__main__':
assert(messages_check[i].tn == messages_ref[i].tn)
# Validate a message
assert(messages_check[i].validate())
messages_check[i].validate()
log.info("Check append_msg(): OK")
@ -318,7 +318,7 @@ if __name__ == '__main__':
assert(messages_check[i].tn == messages_ref[i].tn)
# Validate a message
assert(messages_check[i].validate())
messages_check[i].validate()
log.info("Check append_all(): OK")
@ -340,8 +340,8 @@ if __name__ == '__main__':
assert(msg10.tn == messages_ref[10].tn)
# Validate both messages
assert(msg0.validate())
assert(msg10.validate())
msg0.validate()
msg10.validate()
log.info("Check parse_msg(): OK")
@ -362,6 +362,6 @@ if __name__ == '__main__':
assert(messages_check[i].tn == messages_ref[i + 10].tn)
# Validate a message
assert(messages_check[i].validate())
messages_check[i].validate()
log.info("Check parse_all(): OK")

View File

@ -107,8 +107,7 @@ class DATAInterface(UDPLink):
def send_msg(self, msg, legacy = False):
# Validate a message
if not msg.validate():
raise ValueError("Message incomplete or incorrect")
msg.validate()
# Generate TRX message
payload = msg.gen_msg(legacy)

View File

@ -227,30 +227,27 @@ class DATAMSG:
return buf
# Validates the message fields
# Validates the message fields (throws ValueError)
def validate(self):
if not self.ver in self.known_versions:
return False
raise ValueError("Unknown TRXD header version")
if self.fn is None:
return False
raise ValueError("TDMA frame-number is not set")
if self.fn < 0 or self.fn > GSM_HYPERFRAME:
return False
raise ValueError("TDMA frame-number %d is out of range" % self.fn)
if self.tn is None:
return False
raise ValueError("TDMA time-slot is not set")
if self.tn < 0 or self.tn > 7:
return False
return True
raise ValueError("TDMA time-slot %d is out of range" % self.tn)
# Generates a TRX DATA message
def gen_msg(self, legacy = False):
# Validate all the fields
if not self.validate():
raise ValueError("Message incomplete or incorrect")
self.validate()
# Allocate an empty byte-array
buf = bytearray()
@ -342,27 +339,24 @@ class DATAMSG_L12TRX(DATAMSG):
return length
# Validates the message fields
# Validates the message fields (throws ValueError)
def validate(self):
# Validate common fields
if not DATAMSG.validate(self):
return False
DATAMSG.validate(self)
if self.pwr is None:
return False
raise ValueError("Tx Attenuation level is not set")
if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX:
return False
raise ValueError("Tx Attenuation %d is out of range" % self.pwr)
# FIXME: properly handle IDLE / NOPE indications
if self.burst is None:
return False
raise ValueError("Tx burst bits are not set")
# FIXME: properly handle IDLE / NOPE indications
if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN):
return False
return True
raise ValueError("Tx burst has odd length %u" % len(self.burst))
# Generates a random power level
def rand_pwr(self, min = None, max = None):
@ -582,85 +576,77 @@ class DATAMSG_TRX2L1(DATAMSG):
def _validate_burst_v0(self):
# Burst is mandatory
if self.burst is None:
return False
raise ValueError("Rx burst bits are not set")
# ... and can be either of GSM (GMSK) or EDGE (8-PSK)
if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN):
return False
return True
raise ValueError("Rx burst has odd length %u" % len(self.burst))
def _validate_burst_v1(self):
# Burst is omitted in case of an IDLE / NOPE indication
if self.nope_ind and self.burst is None:
return True
if self.nope_ind and self.burst is not None:
return False
return
if self.nope_ind and self.burst is not None:
raise ValueError("NOPE.ind comes with burst?!?")
if self.burst is None:
return False
raise ValueError("Rx burst bits are not set")
# Burst length depends on modulation type
if len(self.burst) != self.mod_type.bl:
return False
raise ValueError("Rx burst has odd length %u" % len(self.burst))
return True
# Validates the burst
# Validates the burst (throws ValueError)
def validate_burst(self):
if self.ver == 0x00:
return self._validate_burst_v0()
self._validate_burst_v0()
elif self.ver >= 0x01:
return self._validate_burst_v1()
self._validate_burst_v1()
# Validates the message header fields
# Validates the message header fields (throws ValueError)
def validate(self):
# Validate common fields
if not DATAMSG.validate(self):
return False
DATAMSG.validate(self)
if self.rssi is None:
return False
raise ValueError("RSSI is not set")
if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX:
return False
raise ValueError("RSSI %d is out of range" % self.rssi)
if self.toa256 is None:
return False
raise ValueError("ToA256 is not set")
if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX:
return False
raise ValueError("ToA256 %d is out of range" % self.toa256)
if self.ver >= 0x01:
if type(self.mod_type) is not Modulation:
return False
raise ValueError("Unknown Rx modulation type")
if self.tsc_set is None:
return False
raise ValueError("TSC set is not set")
if self.mod_type is Modulation.ModGMSK:
if self.tsc_set not in range(0, 4):
return False
raise ValueError("TSC set %d is out of range" % self.tsc_set)
else:
if self.tsc_set not in range(0, 2):
return False
raise ValueError("TSC set %d is out of range" % self.tsc_set)
if self.tsc is None:
return False
raise ValueError("TSC is not set")
if self.tsc not in self.TSC_RANGE:
return False
raise ValueError("TSC %d is out of range" % self.tsc)
if self.ci is None:
return False
raise ValueError("C/I is not set")
if self.ci < self.CI_MIN or self.ci > self.CI_MAX:
return False
raise ValueError("C/I %d is out of range" % self.ci)
if not self.validate_burst():
return False
return True
self.validate_burst()
# Generates a random RSSI value
def rand_rssi(self, min = None, max = None):
@ -886,11 +872,39 @@ if __name__ == '__main__':
msg_l12trx_ref.rand_burst()
msg_trx2l1_ref.rand_burst()
assert(msg_l12trx_ref.validate())
assert(msg_trx2l1_ref.validate())
msg_l12trx_ref.validate()
msg_trx2l1_ref.validate()
log.info("Validate header randomization: OK")
# Test error handling for common fields
msg = DATAMSG()
# Make sure that message validation throws a ValueError
def validate_throw(msg):
try:
msg.validate()
return False
except ValueError:
return True
# Unknown version
msg.rand_hdr()
msg.ver = 100
assert(validate_throw(msg))
# Uninitialized field
msg.rand_hdr()
msg.fn = None
assert(validate_throw(msg))
# Out-of-range value
msg.rand_hdr()
msg.tn = 10
assert(validate_throw(msg))
log.info("Check incorrect message validation: OK")
log.info("Encoding the reference messages")
# Encode DATA messages