trx_toolkit: use generic logging module instead of print()
There are multiple advantages of using Python's logging module: - advanced message formatting (file name, line number, etc.), - multiple logging targets (e.g. stderr, file, socket), - logging levels (e.g. DEBUG, INFO, ERROR), - the pythonic way ;) so, let's replace multiple print() calls by logging calls, add use the following logging message format by default: [%(levelname)s] %(filename)s:%(lineno)d %(message)s Examples: [INFO] ctrl_if_bts.py:57 Starting transceiver... [DEBUG] clck_gen.py:87 IND CLOCK 26826 [DEBUG] ctrl_if_bts.py:71 Recv POWEROFF cmd [INFO] ctrl_if_bts.py:73 Stopping transceiver... [INFO] fake_trx.py:127 Shutting down... Please note that there is no way to filter messages by logging level yet. This is to be introduced soon, together with argparse. Change-Id: I7fcafabafe8323b58990997a47afdd48b6d1f357
This commit is contained in:
parent
72c8296bfe
commit
6bab6acee6
|
@ -22,6 +22,7 @@
|
|||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
import logging as log
|
||||
import random
|
||||
|
||||
from data_msg import *
|
||||
|
@ -214,7 +215,7 @@ class BurstForwarder:
|
|||
# Burst dropping
|
||||
if self.burst_dl_drop_amount > 0:
|
||||
if msg.fn % self.burst_dl_drop_period == 0:
|
||||
print("[~] Simulation: dropping DL burst (fn=%u %% %u == 0)"
|
||||
log.info("Simulation: dropping DL burst (fn=%u %% %u == 0)"
|
||||
% (msg.fn, self.burst_dl_drop_period))
|
||||
self.burst_dl_drop_amount -= 1
|
||||
return None
|
||||
|
@ -226,7 +227,7 @@ class BurstForwarder:
|
|||
# Burst dropping
|
||||
if self.burst_ul_drop_amount > 0:
|
||||
if msg.fn % self.burst_ul_drop_period == 0:
|
||||
print("[~] Simulation: dropping UL burst (fn=%u %% %u == 0)"
|
||||
log.info("Simulation: dropping UL burst (fn=%u %% %u == 0)"
|
||||
% (msg.fn, self.burst_ul_drop_period))
|
||||
self.burst_ul_drop_amount -= 1
|
||||
return None
|
||||
|
@ -254,7 +255,7 @@ class BurstForwarder:
|
|||
msg_l12trx = DATAMSG_L12TRX()
|
||||
msg_l12trx.parse_msg(bytearray(msg_raw))
|
||||
except:
|
||||
print("[!] Dropping unhandled DL message...")
|
||||
log.error("Dropping unhandled DL message...")
|
||||
return None
|
||||
|
||||
# Compose a new message for L1
|
||||
|
@ -320,7 +321,7 @@ class BurstForwarder:
|
|||
|
||||
# Timeslot filter
|
||||
if msg.tn not in self.ts_pass_list:
|
||||
print("[!] TS %u is not configured, dropping UL burst..." % msg.tn)
|
||||
log.warning("TS %u is not configured, dropping UL burst..." % msg.tn)
|
||||
return None
|
||||
|
||||
# Path loss simulation
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
from copyright import print_copyright
|
||||
CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
|
||||
|
||||
import logging as log
|
||||
import signal
|
||||
import getopt
|
||||
import sys
|
||||
|
@ -64,6 +65,10 @@ class Application:
|
|||
# Set up signal handlers
|
||||
signal.signal(signal.SIGINT, self.sig_handler)
|
||||
|
||||
# Configure logging
|
||||
log.basicConfig(level = log.DEBUG,
|
||||
format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
|
||||
|
||||
# Open requested capture file
|
||||
if self.output_file is not None:
|
||||
self.ddf = DATADumpFile(self.output_file)
|
||||
|
@ -130,7 +135,7 @@ class Application:
|
|||
# Set burst
|
||||
msg.burst = burst
|
||||
|
||||
print("[i] Sending %d/%d %s burst %s to %s..."
|
||||
log.info("Sending %d/%d %s burst %s to %s..."
|
||||
% (i + 1, self.burst_count, self.burst_type,
|
||||
msg.desc_hdr(), self.conn_mode))
|
||||
|
||||
|
@ -239,7 +244,7 @@ class Application:
|
|||
sys.exit(2)
|
||||
|
||||
def sig_handler(self, signum, frame):
|
||||
print("Signal %d received" % signum)
|
||||
log.info("Signal %d received" % signum)
|
||||
if signum is signal.SIGINT:
|
||||
sys.exit(0)
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
from copyright import print_copyright
|
||||
CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
|
||||
|
||||
import logging as log
|
||||
import signal
|
||||
import getopt
|
||||
import sys
|
||||
|
@ -60,6 +61,10 @@ class Application:
|
|||
# Set up signal handlers
|
||||
signal.signal(signal.SIGINT, self.sig_handler)
|
||||
|
||||
# Configure logging
|
||||
log.basicConfig(level = log.DEBUG,
|
||||
format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
|
||||
|
||||
# Open requested capture file
|
||||
self.ddf = DATADumpFile(self.capture_file)
|
||||
|
||||
|
@ -88,7 +93,7 @@ class Application:
|
|||
if not self.msg_pass_filter(l12trx, msg):
|
||||
continue
|
||||
|
||||
print("[i] Sending a burst %s to %s..."
|
||||
log.info("Sending a burst %s to %s..."
|
||||
% (msg.desc_hdr(), self.conn_mode))
|
||||
|
||||
# Send message
|
||||
|
@ -209,7 +214,7 @@ class Application:
|
|||
sys.exit(2)
|
||||
|
||||
def sig_handler(self, signum, frame):
|
||||
print("Signal %d received" % signum)
|
||||
log.info("Signal %d received" % signum)
|
||||
if signum is signal.SIGINT:
|
||||
sys.exit(0)
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
from copyright import print_copyright
|
||||
CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
|
||||
|
||||
import logging as log
|
||||
import signal
|
||||
import time
|
||||
import sys
|
||||
|
@ -83,7 +84,7 @@ class CLCKGen:
|
|||
link.send(payload)
|
||||
|
||||
# Debug print
|
||||
print("[T] %s" % payload)
|
||||
log.debug(payload)
|
||||
|
||||
# Increase frame count
|
||||
self.clck_src += self.ind_period
|
||||
|
@ -101,13 +102,17 @@ class Application:
|
|||
# Set up signal handlers
|
||||
signal.signal(signal.SIGINT, self.sig_handler)
|
||||
|
||||
# Configure logging
|
||||
log.basicConfig(level = log.DEBUG,
|
||||
format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
|
||||
|
||||
def run(self):
|
||||
self.link = UDPLink("127.0.0.1", 5800, "0.0.0.0", 5700)
|
||||
self.clck = CLCKGen([self.link], ind_period = 51)
|
||||
self.clck.start()
|
||||
|
||||
def sig_handler(self, signum, frame):
|
||||
print("Signal %d received" % signum)
|
||||
log.info("Signal %d received" % signum)
|
||||
if signum is signal.SIGINT:
|
||||
self.clck.stop()
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
from copyright import print_copyright
|
||||
CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
|
||||
|
||||
import logging as log
|
||||
import signal
|
||||
import getopt
|
||||
import select
|
||||
|
@ -48,12 +49,16 @@ class Application:
|
|||
# Set up signal handlers
|
||||
signal.signal(signal.SIGINT, self.sig_handler)
|
||||
|
||||
# Configure logging
|
||||
log.basicConfig(level = log.DEBUG,
|
||||
format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
|
||||
|
||||
# Init UDP connection
|
||||
self.ctrl_link = UDPLink(self.remote_addr, self.base_port + 1,
|
||||
self.bind_addr, self.bind_port)
|
||||
|
||||
# Debug print
|
||||
print("[i] Init CTRL interface (%s)" \
|
||||
log.info("Init CTRL interface (%s)" \
|
||||
% self.ctrl_link.desc_link())
|
||||
|
||||
def print_help(self, msg = None):
|
||||
|
@ -138,7 +143,7 @@ class Application:
|
|||
sys.stdout.flush()
|
||||
|
||||
def sig_handler(self, signum, frame):
|
||||
print("\n\nSignal %d received" % signum)
|
||||
log.info("Signal %d received" % signum)
|
||||
if signum is signal.SIGINT:
|
||||
sys.exit(0)
|
||||
|
||||
|
|
|
@ -22,12 +22,14 @@
|
|||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
import logging as log
|
||||
|
||||
from udp_link import UDPLink
|
||||
|
||||
class CTRLInterface(UDPLink):
|
||||
def handle_rx(self, data, remote):
|
||||
if not self.verify_req(data):
|
||||
print("[!] Wrong data on CTRL interface")
|
||||
log.error("Wrong data on CTRL interface")
|
||||
return
|
||||
|
||||
# Attempt to parse a command
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
import logging as log
|
||||
|
||||
from ctrl_if import CTRLInterface
|
||||
|
||||
class CTRLInterfaceBB(CTRLInterface):
|
||||
|
@ -34,37 +36,37 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
|
||||
def __init__(self, remote_addr, remote_port, bind_addr, bind_port):
|
||||
CTRLInterface.__init__(self, remote_addr, remote_port, bind_addr, bind_port)
|
||||
print("[i] Init CTRL interface for BB (%s)" % self.desc_link())
|
||||
log.info("Init CTRL interface for BB (%s)" % self.desc_link())
|
||||
|
||||
def parse_cmd(self, request):
|
||||
# Power control
|
||||
if self.verify_cmd(request, "POWERON", 0):
|
||||
print("[i] Recv POWERON CMD")
|
||||
log.debug("Recv POWERON CMD")
|
||||
|
||||
# Ensure transceiver isn't working
|
||||
if self.trx_started:
|
||||
print("[!] Transceiver already started")
|
||||
log.error("Transceiver already started")
|
||||
return -1
|
||||
|
||||
# Ensure RX / TX freq. are set
|
||||
if (self.rx_freq is None) or (self.tx_freq is None):
|
||||
print("[!] RX / TX freq. are not set")
|
||||
log.error("RX / TX freq. are not set")
|
||||
return -1
|
||||
|
||||
print("[i] Starting transceiver...")
|
||||
log.info("Starting transceiver...")
|
||||
self.trx_started = True
|
||||
return 0
|
||||
|
||||
elif self.verify_cmd(request, "POWEROFF", 0):
|
||||
print("[i] Recv POWEROFF cmd")
|
||||
log.debug("Recv POWEROFF cmd")
|
||||
|
||||
print("[i] Stopping transceiver...")
|
||||
log.info("Stopping transceiver...")
|
||||
self.trx_started = False
|
||||
return 0
|
||||
|
||||
# Tuning Control
|
||||
elif self.verify_cmd(request, "RXTUNE", 1):
|
||||
print("[i] Recv RXTUNE cmd")
|
||||
log.debug("Recv RXTUNE cmd")
|
||||
|
||||
# TODO: check freq range
|
||||
self.rx_freq = int(request[1]) * 1000
|
||||
|
@ -72,7 +74,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
return 0
|
||||
|
||||
elif self.verify_cmd(request, "TXTUNE", 1):
|
||||
print("[i] Recv TXTUNE cmd")
|
||||
log.debug("Recv TXTUNE cmd")
|
||||
|
||||
# TODO: check freq range
|
||||
self.tx_freq = int(request[1]) * 1000
|
||||
|
@ -80,7 +82,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
|
||||
# Power measurement
|
||||
elif self.verify_cmd(request, "MEASURE", 1):
|
||||
print("[i] Recv MEASURE cmd")
|
||||
log.debug("Recv MEASURE cmd")
|
||||
|
||||
if self.pm is None:
|
||||
return -1
|
||||
|
@ -92,7 +94,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
return (0, [meas_dbm])
|
||||
|
||||
elif self.verify_cmd(request, "SETSLOT", 2):
|
||||
print("[i] Recv SETSLOT cmd")
|
||||
log.debug("Recv SETSLOT cmd")
|
||||
|
||||
if self.burst_fwd is None:
|
||||
return -1
|
||||
|
@ -100,7 +102,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
# Obtain TS index
|
||||
ts = int(request[1])
|
||||
if ts not in range(0, 8):
|
||||
print("[!] TS index should be in range: 0..7")
|
||||
log.error("TS index should be in range: 0..7")
|
||||
return -1
|
||||
|
||||
# Parse TS type
|
||||
|
@ -113,7 +115,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
if ts in self.burst_fwd.ts_pass_list:
|
||||
self.burst_fwd.ts_pass_list.remove(ts)
|
||||
else:
|
||||
print("[!] TS %u was not activated before" % ts)
|
||||
log.warning("TS %u was not activated before" % ts)
|
||||
# TODO: uncomment as soon as RESET is introduced
|
||||
# return -1
|
||||
else:
|
||||
|
@ -121,7 +123,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
if ts not in self.burst_fwd.ts_pass_list:
|
||||
self.burst_fwd.ts_pass_list.append(ts)
|
||||
else:
|
||||
print("[!] TS %u was already activated before" % ts)
|
||||
log.warning("TS %u was already activated before" % ts)
|
||||
# TODO: uncomment as soon as RESET is introduced
|
||||
# return -1
|
||||
|
||||
|
@ -129,7 +131,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
|
||||
# Timing Advance
|
||||
elif self.verify_cmd(request, "SETTA", 1):
|
||||
print("[i] Recv SETTA cmd")
|
||||
log.debug("Recv SETTA cmd")
|
||||
|
||||
# Save to the BurstForwarder instance
|
||||
self.burst_fwd.ta = ta
|
||||
|
@ -138,7 +140,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
# Timing of Arrival simulation for Uplink
|
||||
# Absolute form: CMD FAKE_TOA <BASE> <THRESH>
|
||||
elif self.verify_cmd(request, "FAKE_TOA", 2):
|
||||
print("[i] Recv FAKE_TOA cmd")
|
||||
log.debug("Recv FAKE_TOA cmd")
|
||||
|
||||
# Parse and apply both base and threshold
|
||||
self.burst_fwd.toa256_ul_base = int(request[1])
|
||||
|
@ -149,7 +151,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
# Timing of Arrival simulation for Uplink
|
||||
# Relative form: CMD FAKE_TOA <+-BASE_DELTA>
|
||||
elif self.verify_cmd(request, "FAKE_TOA", 1):
|
||||
print("[i] Recv FAKE_TOA cmd")
|
||||
log.debug("Recv FAKE_TOA cmd")
|
||||
|
||||
# Parse and apply delta
|
||||
self.burst_fwd.toa256_ul_base += int(request[1])
|
||||
|
@ -159,7 +161,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
# RSSI simulation for Uplink
|
||||
# Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
|
||||
elif self.verify_cmd(request, "FAKE_RSSI", 2):
|
||||
print("[i] Recv FAKE_RSSI cmd")
|
||||
log.debug("Recv FAKE_RSSI cmd")
|
||||
|
||||
# Parse and apply both base and threshold
|
||||
self.burst_fwd.rssi_ul_base = int(request[1])
|
||||
|
@ -170,7 +172,7 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
# RSSI simulation for Uplink
|
||||
# Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
|
||||
elif self.verify_cmd(request, "FAKE_RSSI", 1):
|
||||
print("[i] Recv FAKE_RSSI cmd")
|
||||
log.debug("Recv FAKE_RSSI cmd")
|
||||
|
||||
# Parse and apply delta
|
||||
self.burst_fwd.rssi_ul_base += int(request[1])
|
||||
|
@ -181,12 +183,12 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
# Syntax: CMD FAKE_DROP <AMOUNT>
|
||||
# Dropping pattern: fn % 1 == 0
|
||||
elif self.verify_cmd(request, "FAKE_DROP", 1):
|
||||
print("[i] Recv FAKE_DROP cmd")
|
||||
log.debug("Recv FAKE_DROP cmd")
|
||||
|
||||
# Parse / validate amount of bursts
|
||||
num = int(request[1])
|
||||
if num < 0:
|
||||
print("[!] FAKE_DROP amount shall not be negative")
|
||||
log.error("FAKE_DROP amount shall not be negative")
|
||||
return -1
|
||||
|
||||
self.burst_fwd.burst_ul_drop_amount = num
|
||||
|
@ -198,18 +200,18 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
# Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
|
||||
# Dropping pattern: fn % period == 0
|
||||
elif self.verify_cmd(request, "FAKE_DROP", 2):
|
||||
print("[i] Recv FAKE_DROP cmd")
|
||||
log.debug("Recv FAKE_DROP cmd")
|
||||
|
||||
# Parse / validate amount of bursts
|
||||
num = int(request[1])
|
||||
if num < 0:
|
||||
print("[!] FAKE_DROP amount shall not be negative")
|
||||
log.error("FAKE_DROP amount shall not be negative")
|
||||
return -1
|
||||
|
||||
# Parse / validate period
|
||||
period = int(request[2])
|
||||
if period <= 0:
|
||||
print("[!] FAKE_DROP period shall be greater than zero")
|
||||
log.error("FAKE_DROP period shall be greater than zero")
|
||||
return -1
|
||||
|
||||
self.burst_fwd.burst_ul_drop_amount = num
|
||||
|
@ -221,5 +223,5 @@ class CTRLInterfaceBB(CTRLInterface):
|
|||
else:
|
||||
# We don't care about other commands,
|
||||
# so let's merely ignore them ;)
|
||||
print("[i] Ignore CMD %s" % request[0])
|
||||
log.debug("Ignore CMD %s" % request[0])
|
||||
return 0
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
import logging as log
|
||||
|
||||
from ctrl_if import CTRLInterface
|
||||
|
||||
class CTRLInterfaceBTS(CTRLInterface):
|
||||
|
@ -35,24 +37,24 @@ class CTRLInterfaceBTS(CTRLInterface):
|
|||
|
||||
def __init__(self, remote_addr, remote_port, bind_addr, bind_port):
|
||||
CTRLInterface.__init__(self, remote_addr, remote_port, bind_addr, bind_port)
|
||||
print("[i] Init CTRL interface for BTS (%s)" % self.desc_link())
|
||||
log.info("Init CTRL interface for BTS (%s)" % self.desc_link())
|
||||
|
||||
def parse_cmd(self, request):
|
||||
# Power control
|
||||
if self.verify_cmd(request, "POWERON", 0):
|
||||
print("[i] Recv POWERON CMD")
|
||||
log.debug("Recv POWERON CMD")
|
||||
|
||||
# Ensure transceiver isn't working
|
||||
if self.trx_started:
|
||||
print("[!] Transceiver already started")
|
||||
log.error("Transceiver already started")
|
||||
return -1
|
||||
|
||||
# Ensure RX / TX freq. are set
|
||||
if (self.rx_freq is None) or (self.tx_freq is None):
|
||||
print("[!] RX / TX freq. are not set")
|
||||
log.error("RX / TX freq. are not set")
|
||||
return -1
|
||||
|
||||
print("[i] Starting transceiver...")
|
||||
log.info("Starting transceiver...")
|
||||
self.trx_started = True
|
||||
|
||||
# Power emulation
|
||||
|
@ -66,9 +68,9 @@ class CTRLInterfaceBTS(CTRLInterface):
|
|||
return 0
|
||||
|
||||
elif self.verify_cmd(request, "POWEROFF", 0):
|
||||
print("[i] Recv POWEROFF cmd")
|
||||
log.debug("Recv POWEROFF cmd")
|
||||
|
||||
print("[i] Stopping transceiver...")
|
||||
log.info("Stopping transceiver...")
|
||||
self.trx_started = False
|
||||
|
||||
# Power emulation
|
||||
|
@ -83,14 +85,14 @@ class CTRLInterfaceBTS(CTRLInterface):
|
|||
|
||||
# Tuning Control
|
||||
elif self.verify_cmd(request, "RXTUNE", 1):
|
||||
print("[i] Recv RXTUNE cmd")
|
||||
log.debug("Recv RXTUNE cmd")
|
||||
|
||||
# TODO: check freq range
|
||||
self.rx_freq = int(request[1]) * 1000
|
||||
return 0
|
||||
|
||||
elif self.verify_cmd(request, "TXTUNE", 1):
|
||||
print("[i] Recv TXTUNE cmd")
|
||||
log.debug("Recv TXTUNE cmd")
|
||||
|
||||
# TODO: check freq range
|
||||
self.tx_freq = int(request[1]) * 1000
|
||||
|
@ -100,7 +102,7 @@ class CTRLInterfaceBTS(CTRLInterface):
|
|||
# Timing of Arrival simulation for Downlink
|
||||
# Absolute form: CMD FAKE_TOA <BASE> <THRESH>
|
||||
elif self.verify_cmd(request, "FAKE_TOA", 2):
|
||||
print("[i] Recv FAKE_TOA cmd")
|
||||
log.debug("Recv FAKE_TOA cmd")
|
||||
|
||||
# Parse and apply both base and threshold
|
||||
self.burst_fwd.toa256_dl_base = int(request[1])
|
||||
|
@ -111,7 +113,7 @@ class CTRLInterfaceBTS(CTRLInterface):
|
|||
# Timing of Arrival simulation for Downlink
|
||||
# Relative form: CMD FAKE_TOA <+-BASE_DELTA>
|
||||
elif self.verify_cmd(request, "FAKE_TOA", 1):
|
||||
print("[i] Recv FAKE_TOA cmd")
|
||||
log.debug("Recv FAKE_TOA cmd")
|
||||
|
||||
# Parse and apply delta
|
||||
self.burst_fwd.toa256_dl_base += int(request[1])
|
||||
|
@ -121,7 +123,7 @@ class CTRLInterfaceBTS(CTRLInterface):
|
|||
# RSSI simulation for Downlink
|
||||
# Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
|
||||
elif self.verify_cmd(request, "FAKE_RSSI", 2):
|
||||
print("[i] Recv FAKE_RSSI cmd")
|
||||
log.debug("Recv FAKE_RSSI cmd")
|
||||
|
||||
# Parse and apply both base and threshold
|
||||
self.burst_fwd.rssi_dl_base = int(request[1])
|
||||
|
@ -132,7 +134,7 @@ class CTRLInterfaceBTS(CTRLInterface):
|
|||
# RSSI simulation for Downlink
|
||||
# Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
|
||||
elif self.verify_cmd(request, "FAKE_RSSI", 1):
|
||||
print("[i] Recv FAKE_RSSI cmd")
|
||||
log.debug("Recv FAKE_RSSI cmd")
|
||||
|
||||
# Parse and apply delta
|
||||
self.burst_fwd.rssi_dl_base += int(request[1])
|
||||
|
@ -143,12 +145,12 @@ class CTRLInterfaceBTS(CTRLInterface):
|
|||
# Syntax: CMD FAKE_DROP <AMOUNT>
|
||||
# Dropping pattern: fn % 1 == 0
|
||||
elif self.verify_cmd(request, "FAKE_DROP", 1):
|
||||
print("[i] Recv FAKE_DROP cmd")
|
||||
log.debug("Recv FAKE_DROP cmd")
|
||||
|
||||
# Parse / validate amount of bursts
|
||||
num = int(request[1])
|
||||
if num < 0:
|
||||
print("[!] FAKE_DROP amount shall not be negative")
|
||||
log.error("FAKE_DROP amount shall not be negative")
|
||||
return -1
|
||||
|
||||
self.burst_fwd.burst_dl_drop_amount = num
|
||||
|
@ -160,18 +162,18 @@ class CTRLInterfaceBTS(CTRLInterface):
|
|||
# Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
|
||||
# Dropping pattern: fn % period == 0
|
||||
elif self.verify_cmd(request, "FAKE_DROP", 2):
|
||||
print("[i] Recv FAKE_DROP cmd")
|
||||
log.debug("Recv FAKE_DROP cmd")
|
||||
|
||||
# Parse / validate amount of bursts
|
||||
num = int(request[1])
|
||||
if num < 0:
|
||||
print("[!] FAKE_DROP amount shall not be negative")
|
||||
log.error("FAKE_DROP amount shall not be negative")
|
||||
return -1
|
||||
|
||||
# Parse / validate period
|
||||
period = int(request[2])
|
||||
if period <= 0:
|
||||
print("[!] FAKE_DROP period shall be greater than zero")
|
||||
log.error("FAKE_DROP period shall be greater than zero")
|
||||
return -1
|
||||
|
||||
self.burst_fwd.burst_dl_drop_amount = num
|
||||
|
@ -183,5 +185,5 @@ class CTRLInterfaceBTS(CTRLInterface):
|
|||
else:
|
||||
# We don't care about other commands,
|
||||
# so let's merely ignore them ;)
|
||||
print("[i] Ignore CMD %s" % request[0])
|
||||
log.debug("Ignore CMD %s" % request[0])
|
||||
return 0
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
|
||||
import logging as log
|
||||
import struct
|
||||
|
||||
from data_msg import *
|
||||
|
@ -77,13 +78,15 @@ class DATADumpFile(DATADump):
|
|||
def __init__(self, capture):
|
||||
# Check if capture file is already opened
|
||||
if isinstance(capture, str):
|
||||
print("[i] Opening capture file '%s'..." % capture)
|
||||
log.info("Opening capture file '%s'..." % capture)
|
||||
self.f = open(capture, "a+b")
|
||||
else:
|
||||
self.f = capture
|
||||
|
||||
def __del__(self):
|
||||
print("[i] Closing the capture file")
|
||||
# FIXME: this causes an Exception in Python 2 (but not in Python 3)
|
||||
# AttributeError: 'NoneType' object has no attribute 'info'
|
||||
log.info("Closing the capture file")
|
||||
self.f.close()
|
||||
|
||||
# Moves the file descriptor before a specified message
|
||||
|
@ -104,7 +107,7 @@ class DATADumpFile(DATADump):
|
|||
# Attempt to parse it
|
||||
rc = self.parse_hdr(hdr_raw)
|
||||
if rc is False:
|
||||
print("[!] Couldn't parse a message header")
|
||||
log.error("Couldn't parse a message header")
|
||||
return False
|
||||
|
||||
# Expand the header
|
||||
|
@ -129,7 +132,7 @@ class DATADumpFile(DATADump):
|
|||
# Attempt to parse it
|
||||
rc = self.parse_hdr(hdr_raw)
|
||||
if rc is False:
|
||||
print("[!] Couldn't parse a message header")
|
||||
log.error("Couldn't parse a message header")
|
||||
return None
|
||||
|
||||
# Expand the header
|
||||
|
@ -138,7 +141,7 @@ class DATADumpFile(DATADump):
|
|||
# Attempt to read a message
|
||||
msg_raw = self.f.read(msg_len)
|
||||
if len(msg_raw) != msg_len:
|
||||
print("[!] Message length mismatch")
|
||||
log.error("Message length mismatch")
|
||||
return None
|
||||
|
||||
# Attempt to parse a message
|
||||
|
@ -146,7 +149,7 @@ class DATADumpFile(DATADump):
|
|||
msg_raw = bytearray(msg_raw)
|
||||
msg.parse_msg(msg_raw)
|
||||
except:
|
||||
print("[!] Couldn't parse a message, skipping...")
|
||||
log.error("Couldn't parse a message, skipping...")
|
||||
return False
|
||||
|
||||
# Success
|
||||
|
@ -161,7 +164,7 @@ class DATADumpFile(DATADump):
|
|||
# Move descriptor to the begining of requested message
|
||||
rc = self._seek2msg(idx)
|
||||
if not rc:
|
||||
print("[!] Couldn't find requested message")
|
||||
log.error("Couldn't find requested message")
|
||||
return False
|
||||
|
||||
# Attempt to parse a message
|
||||
|
@ -181,7 +184,7 @@ class DATADumpFile(DATADump):
|
|||
else:
|
||||
rc = self._seek2msg(skip)
|
||||
if not rc:
|
||||
print("[!] Couldn't find requested message")
|
||||
log.error("Couldn't find requested message")
|
||||
return False
|
||||
|
||||
# Read the capture in loop...
|
||||
|
@ -224,6 +227,10 @@ if __name__ == '__main__':
|
|||
from gsm_shared import *
|
||||
import random
|
||||
|
||||
# Configure logging
|
||||
log.basicConfig(level = log.DEBUG,
|
||||
format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
|
||||
|
||||
# Create a temporary file
|
||||
tf = TemporaryFile()
|
||||
|
||||
|
@ -242,7 +249,7 @@ if __name__ == '__main__':
|
|||
burst_trx2l1.append(sbit)
|
||||
|
||||
# Generate a basic list of random messages
|
||||
print("[i] Generating the reference messages")
|
||||
log.info("Generating the reference messages")
|
||||
messages_ref = []
|
||||
|
||||
for i in range(100):
|
||||
|
@ -260,9 +267,9 @@ if __name__ == '__main__':
|
|||
# Append
|
||||
messages_ref.append(msg)
|
||||
|
||||
print("[i] Adding the following messages to the capture:")
|
||||
log.info("Adding the following messages to the capture:")
|
||||
for msg in messages_ref[:3]:
|
||||
print(" %s: burst_len=%d"
|
||||
log.info("%s: burst_len=%d"
|
||||
% (msg.desc_hdr(), len(msg.burst)))
|
||||
|
||||
# Check single message appending
|
||||
|
@ -273,9 +280,9 @@ if __name__ == '__main__':
|
|||
# Read the written messages back
|
||||
messages_check = ddf.parse_all()
|
||||
|
||||
print("[i] Read the following messages back:")
|
||||
log.info("Read the following messages back:")
|
||||
for msg in messages_check:
|
||||
print(" %s: burst_len=%d"
|
||||
log.info("%s: burst_len=%d"
|
||||
% (msg.desc_hdr(), len(msg.burst)))
|
||||
|
||||
# Expecting three messages
|
||||
|
@ -291,7 +298,7 @@ if __name__ == '__main__':
|
|||
# Validate a message
|
||||
assert(messages_check[i].validate())
|
||||
|
||||
print("[?] Check append_msg(): OK")
|
||||
log.info("Check append_msg(): OK")
|
||||
|
||||
|
||||
# Append the pending reference messages
|
||||
|
@ -313,7 +320,7 @@ if __name__ == '__main__':
|
|||
# Validate a message
|
||||
assert(messages_check[i].validate())
|
||||
|
||||
print("[?] Check append_all(): OK")
|
||||
log.info("Check append_all(): OK")
|
||||
|
||||
|
||||
# Check parse_msg()
|
||||
|
@ -336,7 +343,7 @@ if __name__ == '__main__':
|
|||
assert(msg0.validate())
|
||||
assert(msg10.validate())
|
||||
|
||||
print("[?] Check parse_msg(): OK")
|
||||
log.info("Check parse_msg(): OK")
|
||||
|
||||
|
||||
# Check parse_all() with range
|
||||
|
@ -357,4 +364,4 @@ if __name__ == '__main__':
|
|||
# Validate a message
|
||||
assert(messages_check[i].validate())
|
||||
|
||||
print("[?] Check parse_all(): OK")
|
||||
log.info("Check parse_all(): OK")
|
||||
|
|
|
@ -431,6 +431,12 @@ class DATAMSG_TRX2L1(DATAMSG):
|
|||
|
||||
# Regression test
|
||||
if __name__ == '__main__':
|
||||
import logging as log
|
||||
|
||||
# Configure logging
|
||||
log.basicConfig(level = log.DEBUG,
|
||||
format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
|
||||
|
||||
# Common reference data
|
||||
fn = 1024
|
||||
tn = 0
|
||||
|
@ -446,7 +452,7 @@ if __name__ == '__main__':
|
|||
sbit = random.randint(-127, 127)
|
||||
burst_trx2l1_ref.append(sbit)
|
||||
|
||||
print("[i] Generating the reference messages")
|
||||
log.info("Generating the reference messages")
|
||||
|
||||
# Create messages of both types
|
||||
msg_l12trx_ref = DATAMSG_L12TRX(fn = fn, tn = tn)
|
||||
|
@ -461,13 +467,13 @@ if __name__ == '__main__':
|
|||
msg_l12trx_ref.burst = burst_l12trx_ref
|
||||
msg_trx2l1_ref.burst = burst_trx2l1_ref
|
||||
|
||||
print("[i] Encoding the reference messages")
|
||||
log.info("Encoding the reference messages")
|
||||
|
||||
# Encode DATA messages
|
||||
l12trx_raw = msg_l12trx_ref.gen_msg()
|
||||
trx2l1_raw = msg_trx2l1_ref.gen_msg()
|
||||
|
||||
print("[i] Parsing generated messages back")
|
||||
log.info("Parsing generated messages back")
|
||||
|
||||
# Parse generated DATA messages
|
||||
msg_l12trx_dec = DATAMSG_L12TRX()
|
||||
|
@ -475,13 +481,13 @@ if __name__ == '__main__':
|
|||
msg_l12trx_dec.parse_msg(l12trx_raw)
|
||||
msg_trx2l1_dec.parse_msg(trx2l1_raw)
|
||||
|
||||
print("[i] Comparing decoded messages with the reference")
|
||||
log.info("Comparing decoded messages with the reference")
|
||||
|
||||
# Compare bursts
|
||||
assert(msg_l12trx_dec.burst == burst_l12trx_ref)
|
||||
assert(msg_trx2l1_dec.burst == burst_trx2l1_ref)
|
||||
|
||||
print("[?] Compare bursts: OK")
|
||||
log.info("Compare bursts: OK")
|
||||
|
||||
# Compare both parsed messages with the reference data
|
||||
assert(msg_l12trx_dec.fn == fn)
|
||||
|
@ -489,14 +495,14 @@ if __name__ == '__main__':
|
|||
assert(msg_l12trx_dec.tn == tn)
|
||||
assert(msg_trx2l1_dec.tn == tn)
|
||||
|
||||
print("[?] Compare FN / TN: OK")
|
||||
log.info("Compare FN / TN: OK")
|
||||
|
||||
# Compare message specific parts
|
||||
assert(msg_trx2l1_dec.rssi == msg_trx2l1_ref.rssi)
|
||||
assert(msg_l12trx_dec.pwr == msg_l12trx_ref.pwr)
|
||||
assert(msg_trx2l1_dec.toa256 == msg_trx2l1_ref.toa256)
|
||||
|
||||
print("[?] Compare message specific data: OK")
|
||||
log.info("Compare message specific data: OK")
|
||||
|
||||
# Validate header randomization
|
||||
for i in range(0, 100):
|
||||
|
@ -506,7 +512,7 @@ if __name__ == '__main__':
|
|||
assert(msg_l12trx_ref.validate())
|
||||
assert(msg_trx2l1_ref.validate())
|
||||
|
||||
print("[?] Validate header randomization: OK")
|
||||
log.info("Validate header randomization: OK")
|
||||
|
||||
# Bit conversation test
|
||||
usbits_ref = list(range(0, 256))
|
||||
|
@ -518,7 +524,7 @@ if __name__ == '__main__':
|
|||
assert(usbits[:255] == usbits_ref[:255])
|
||||
assert(usbits[255] == 254)
|
||||
|
||||
print("[?] Check both usbit2sbit() and sbit2usbit(): OK")
|
||||
log.info("Check both usbit2sbit() and sbit2usbit(): OK")
|
||||
|
||||
# Test both sbit2ubit() and ubit2sbit()
|
||||
ubits = msg_trx2l1_ref.sbit2ubit(sbits_ref)
|
||||
|
@ -527,7 +533,7 @@ if __name__ == '__main__':
|
|||
sbits = msg_trx2l1_ref.ubit2sbit(ubits)
|
||||
assert(sbits == ([-127] * 127 + [127] * 128))
|
||||
|
||||
print("[?] Check both sbit2ubit() and ubit2sbit(): OK")
|
||||
log.info("Check both sbit2ubit() and ubit2sbit(): OK")
|
||||
|
||||
# Test message transformation
|
||||
msg_l12trx_dec = msg_trx2l1_ref.gen_l12trx()
|
||||
|
@ -542,4 +548,4 @@ if __name__ == '__main__':
|
|||
assert(msg_l12trx_dec.burst == msg_l12trx_dec.sbit2ubit(burst_trx2l1_ref))
|
||||
assert(msg_trx2l1_dec.burst == msg_trx2l1_dec.ubit2sbit(burst_l12trx_ref))
|
||||
|
||||
print("[?] Check L12TRX <-> TRX2L1 type transformations: OK")
|
||||
log.info("Check L12TRX <-> TRX2L1 type transformations: OK")
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
from copyright import print_copyright
|
||||
CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
|
||||
|
||||
import logging as log
|
||||
import signal
|
||||
import getopt
|
||||
import select
|
||||
|
@ -53,6 +54,10 @@ class Application:
|
|||
# Set up signal handlers
|
||||
signal.signal(signal.SIGINT, self.sig_handler)
|
||||
|
||||
# Configure logging
|
||||
log.basicConfig(level = log.DEBUG,
|
||||
format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
|
||||
|
||||
def run(self):
|
||||
# Init TRX CTRL interface for BTS
|
||||
self.bts_ctrl = CTRLInterfaceBTS(self.bts_addr, self.bts_base_port + 101,
|
||||
|
@ -90,7 +95,7 @@ class Application:
|
|||
self.clck_gen = CLCKGen([self.bts_clck])
|
||||
self.bts_ctrl.clck_gen = self.clck_gen
|
||||
|
||||
print("[i] Init complete")
|
||||
log.info("Init complete")
|
||||
|
||||
# Enter main loop
|
||||
while True:
|
||||
|
@ -119,7 +124,7 @@ class Application:
|
|||
self.bb_ctrl.handle_rx(data.decode(), addr)
|
||||
|
||||
def shutdown(self):
|
||||
print("[i] Shutting down...")
|
||||
log.info("Shutting down...")
|
||||
|
||||
# Stop clock generator
|
||||
self.clck_gen.stop()
|
||||
|
@ -198,7 +203,7 @@ class Application:
|
|||
sys.exit(2)
|
||||
|
||||
def sig_handler(self, signum, frame):
|
||||
print("Signal %d received" % signum)
|
||||
log.info("Signal %d received" % signum)
|
||||
if signum is signal.SIGINT:
|
||||
self.shutdown()
|
||||
sys.exit(0)
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
from copyright import print_copyright
|
||||
CR_HOLDERS = [("2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
|
||||
|
||||
import logging as log
|
||||
import signal
|
||||
import getopt
|
||||
import sys
|
||||
|
@ -67,6 +68,10 @@ class Application:
|
|||
print_copyright(CR_HOLDERS)
|
||||
self.parse_argv()
|
||||
|
||||
# Configure logging
|
||||
log.basicConfig(level = log.DEBUG,
|
||||
format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
|
||||
|
||||
# Open requested capture file
|
||||
if self.output_file is not None:
|
||||
self.ddf = DATADumpFile(self.output_file)
|
||||
|
@ -76,7 +81,7 @@ class Application:
|
|||
pkt_filter = "udp and (port %d or port %d)" \
|
||||
% (self.sniff_base_port + 2, self.sniff_base_port + 102)
|
||||
|
||||
print("[i] Listening on interface '%s'..." % self.sniff_interface)
|
||||
log.info("Listening on interface '%s'..." % self.sniff_interface)
|
||||
|
||||
# Start sniffing...
|
||||
scapy.all.sniff(iface = self.sniff_interface, store = 0,
|
||||
|
@ -110,7 +115,7 @@ class Application:
|
|||
try:
|
||||
msg.parse_msg(msg_raw)
|
||||
except:
|
||||
print("[!] Failed to parse message, dropping...")
|
||||
log.warning("Failed to parse message, dropping...")
|
||||
self.cnt_burst_dropped_num += 1
|
||||
return
|
||||
|
||||
|
@ -121,7 +126,7 @@ class Application:
|
|||
return
|
||||
|
||||
# Debug print
|
||||
print("[i] %s burst: %s" \
|
||||
log.debug("%s burst: %s" \
|
||||
% ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr()))
|
||||
|
||||
# Poke message handler
|
||||
|
@ -177,22 +182,22 @@ class Application:
|
|||
# Stop sniffing after N bursts
|
||||
if self.cnt_burst_break is not None:
|
||||
if self.cnt_burst_num == self.cnt_burst_break:
|
||||
print("[i] Collected required amount of bursts")
|
||||
log.info("Collected required amount of bursts")
|
||||
return True
|
||||
|
||||
# Stop sniffing after N frames
|
||||
if self.cnt_frame_break is not None:
|
||||
if self.cnt_frame_num == self.cnt_frame_break:
|
||||
print("[i] Collected required amount of frames")
|
||||
log.info("Collected required amount of frames")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def shutdown(self):
|
||||
print("[i] Shutting down...")
|
||||
log.info("Shutting down...")
|
||||
|
||||
# Print statistics
|
||||
print("[i] %u bursts handled, %u dropped" \
|
||||
log.info("%u bursts handled, %u dropped" \
|
||||
% (self.cnt_burst_num, self.cnt_burst_dropped_num))
|
||||
|
||||
# Exit
|
||||
|
|
Loading…
Reference in New Issue