From 060b2729d1efc6ad75fa45ea86ed282dd0fecaec Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Fri, 7 Dec 2018 08:01:37 +0700 Subject: [PATCH] trx_toolkit/trx_sniff.py: migrate from getopt to argparse Change-Id: Id1dacaa32134bfa68344e6c48310390cdd85cdc9 --- src/target/trx_toolkit/trx_sniff.py | 176 ++++++++++------------------ 1 file changed, 60 insertions(+), 116 deletions(-) diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py index 98509839e..9fb567ed0 100755 --- a/src/target/trx_toolkit/trx_sniff.py +++ b/src/target/trx_toolkit/trx_sniff.py @@ -27,7 +27,7 @@ CR_HOLDERS = [("2018", "Vadim Yanitskiy ")] import logging as log import signal -import getopt +import argparse import sys import scapy.all @@ -36,55 +36,37 @@ from data_dump import DATADumpFile from data_msg import * class Application: - # Application variables - sniff_interface = "lo" - sniff_base_port = 5700 - print_bursts = False - output_file = None - # Counters cnt_burst_dropped_num = 0 - cnt_burst_break = None cnt_burst_num = 0 - cnt_frame_break = None cnt_frame_last = None cnt_frame_num = 0 - # Burst direction fliter - bf_dir_l12trx = None - - # Timeslot number filter - bf_tn_val = None - - # Frame number fliter - bf_fn_lt = None - bf_fn_gt = None - # Internal variables lo_trigger = False def __init__(self): print_copyright(CR_HOLDERS) - self.parse_argv() + self.argv = self.parse_argv() # Configure logging log.basicConfig(level = log.DEBUG, format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") # Open requested capture file - if self.output_file is not None: - self.ddf = DATADumpFile(self.output_file) + if self.argv.output_file is not None: + self.ddf = DATADumpFile(self.argv.output_file) def run(self): # Compose a packet filter pkt_filter = "udp and (port %d or port %d)" \ - % (self.sniff_base_port + 2, self.sniff_base_port + 102) + % (self.argv.base_port + 2, self.argv.base_port + 102) - log.info("Listening on interface '%s'..." % self.sniff_interface) + log.info("Listening on interface '%s'..." % self.argv.sniff_if) # Start sniffing... - scapy.all.sniff(iface = self.sniff_interface, store = 0, + scapy.all.sniff(iface = self.argv.sniff_if, store = 0, filter = pkt_filter, prn = self.pkt_handler) # Scapy registers its own signal handler @@ -92,7 +74,7 @@ class Application: def pkt_handler(self, ether): # Prevent loopback packet duplication - if self.sniff_interface == "lo": + if self.argv.sniff_if == "lo": self.lo_trigger = not self.lo_trigger if not self.lo_trigger: return @@ -139,32 +121,34 @@ class Application: def burst_pass_filter(self, l12trx, fn, tn): # Direction filter - if self.bf_dir_l12trx is not None: - if l12trx != self.bf_dir_l12trx: + if self.argv.direction is not None: + if self.argv.direction == "TRX" and not l12trx: + return False + elif self.argv.direction == "L1" and l12trx: return False # Timeslot filter - if self.bf_tn_val is not None: - if tn != self.bf_tn_val: + if self.argv.pf_tn is not None: + if tn != self.argv.pf_tn: return False # Frame number filter - if self.bf_fn_lt is not None: - if fn > self.bf_fn_lt: + if self.argv.pf_fn_lt is not None: + if fn > self.argv.pf_fn_lt: return False - if self.bf_fn_gt is not None: - if fn < self.bf_fn_gt: + if self.argv.pf_fn_gt is not None: + if fn < self.argv.pf_fn_gt: return False # Burst passed ;) return True def msg_handle(self, msg): - if self.print_bursts: + if self.argv.verbose: print(msg.burst) # Append a new message to the capture - if self.output_file is not None: + if self.argv.output_file is not None: self.ddf.append_msg(msg) def burst_count(self, fn, tn): @@ -180,14 +164,14 @@ class Application: self.cnt_burst_num += 1 # Stop sniffing after N bursts - if self.cnt_burst_break is not None: - if self.cnt_burst_num == self.cnt_burst_break: + if self.argv.burst_count is not None: + if self.cnt_burst_num == self.argv.burst_count: log.info("Collected required amount of bursts") return True # Stop sniffing after N frames - if self.cnt_frame_break is not None: - if self.cnt_frame_num == self.cnt_frame_break: + if self.argv.frame_count is not None: + if self.cnt_frame_num == self.argv.frame_count: log.info("Collected required amount of frames") return True @@ -203,88 +187,48 @@ class Application: # Exit sys.exit(0) - def print_help(self, msg = None): - s = " Usage: " + sys.argv[0] + " [options]\n\n" \ - " Some help...\n" \ - " -h --help this text\n\n" - - s += " Sniffing options\n" \ - " -i --sniff-interface Set network interface (default '%s')\n" \ - " -p --sniff-base-port Set base port number (default %d)\n\n" - - s += " Processing (no processing by default)\n" \ - " -o --output-file Write bursts to file\n" \ - " -v --print-bits Print burst bits to stdout\n\n" \ - - s += " Count limitations (disabled by default)\n" \ - " --frame-count NUM Stop after sniffing NUM frames\n" \ - " --burst-count NUM Stop after sniffing NUM bursts\n\n" - - s += " Filtering (disabled by default)\n" \ - " --direction DIR Burst direction: L12TRX or TRX2L1\n" \ - " --timeslot NUM TDMA timeslot number [0..7]\n" \ - " --frame-num-lt NUM TDMA frame number lower than NUM\n" \ - " --burst-num-gt NUM TDMA frame number greater than NUM\n" - - print(s % (self.sniff_interface, self.sniff_base_port)) - - if msg is not None: - print(msg) - def parse_argv(self): - try: - opts, args = getopt.getopt(sys.argv[1:], - "i:p:o:v:h", ["help", "sniff-interface=", "sniff-base-port=", - "frame-count=", "burst-count=", "direction=", - "timeslot=", "frame-num-lt=", "frame-num-gt=", - "output-file=", "print-bits"]) - except getopt.GetoptError as err: - self.print_help("[!] " + str(err)) - sys.exit(2) + parser = argparse.ArgumentParser(prog = "trx_sniff", + description = "Scapy-based TRX interface sniffer") - for o, v in opts: - if o in ("-h", "--help"): - self.print_help() - sys.exit(2) + parser.add_argument("-v", "--verbose", + dest = "verbose", action = "store_true", + help = "Print burst bits to stdout") - elif o in ("-i", "--sniff-interface"): - self.sniff_interface = v - elif o in ("-p", "--sniff-base-port"): - self.sniff_base_port = int(v) + trx_group = parser.add_argument_group("TRX interface") + trx_group.add_argument("-i", "--sniff-interface", + dest = "sniff_if", type = str, default = "lo", metavar = "IF", + help = "Set network interface (default '%(default)s')") + trx_group.add_argument("-p", "--base-port", + dest = "base_port", type = int, default = 6700, + help = "Set base port number (default %(default)s)") + trx_group.add_argument("-o", "--output-file", metavar = "FILE", + dest = "output_file", type = str, + help = "Write bursts to a capture file") - elif o in ("-o", "--output-file"): - self.output_file = v - elif o in ("-v", "--print-bits"): - self.print_bursts = True + cnt_group = parser.add_argument_group("Count limitations (optional)") + cnt_group.add_argument("--frame-count", metavar = "N", + dest = "frame_count", type = int, + help = "Stop after sniffing N frames") + cnt_group.add_argument("--burst-count", metavar = "N", + dest = "burst_count", type = int, + help = "Stop after sniffing N bursts") - # Break counters - elif o == "--frame-count": - self.cnt_frame_break = int(v) - elif o == "--burst-count": - self.cnt_burst_break = int(v) + pf_group = parser.add_argument_group("Filtering (optional)") + pf_group.add_argument("--direction", + dest = "direction", type = str, choices = ["TRX", "L1"], + help = "Burst direction") + pf_group.add_argument("--timeslot", metavar = "TN", + dest = "pf_tn", type = int, choices = range(0, 8), + help = "TDMA timeslot number (equal TN)") + pf_group.add_argument("--frame-num-lt", metavar = "FN", + dest = "pf_fn_lt", type = int, + help = "TDMA frame number (lower than FN)") + pf_group.add_argument("--frame-num-gt", metavar = "FN", + dest = "pf_fn_gt", type = int, + help = "TDMA frame number (greater than FN)") - # Direction filter - elif o == "--direction": - if v == "L12TRX": - self.bf_dir_l12trx = True - elif v == "TRX2L1": - self.bf_dir_l12trx = False - else: - self.print_help("[!] Wrong direction argument") - sys.exit(2) - - # Timeslot pass filter - elif o == "--timeslot": - self.bf_tn_val = int(v) - if self.bf_tn_val < 0 or self.bf_tn_val > 7: - self.print_help("[!] Wrong timeslot value") - sys.exit(2) - - # Frame number pass filter - elif o == "--frame-num-lt": - self.bf_fn_lt = int(v) - elif o == "--frame-num-gt": - self.bf_fn_gt = int(v) + return parser.parse_args() if __name__ == '__main__': app = Application()