150 lines
5.3 KiB
Python
Executable File
150 lines
5.3 KiB
Python
Executable File
#!/usr/bin/env python
|
|
##################################################
|
|
# Gnuradio Python Flow Graph
|
|
# Title: Airprobe File
|
|
# Generated: Sat Dec 6 15:32:40 2014
|
|
##################################################
|
|
|
|
from gnuradio import blocks
|
|
from gnuradio import eng_notation
|
|
from gnuradio import gr
|
|
from gnuradio.eng_option import eng_option
|
|
from gnuradio.filter import firdes
|
|
from optparse import OptionParser
|
|
import gsm
|
|
|
|
class airprobe_file(gr.top_block):
|
|
|
|
def __init__(self, input_file_name="input.cfile", fc=937e6, samp_rate=1e6):
|
|
gr.top_block.__init__(self, "Airprobe File")
|
|
|
|
##################################################
|
|
# Parameters
|
|
##################################################
|
|
self.input_file_name = input_file_name
|
|
self.fc = fc
|
|
self.samp_rate = samp_rate
|
|
|
|
##################################################
|
|
# Variables
|
|
##################################################
|
|
self.SDCCH = SDCCH = 6
|
|
self.RACH = RACH = 3
|
|
self.PCH = PCH = 5
|
|
self.CHANNEL_UNKNOWN = CHANNEL_UNKNOWN = 0
|
|
self.CCCH = CCCH = 2
|
|
self.BCCH = BCCH = 1
|
|
self.AGCH = AGCH = 4
|
|
|
|
##################################################
|
|
# Blocks
|
|
##################################################
|
|
self.gsm_universal_ctrl_chans_demapper_0 = gsm.universal_ctrl_chans_demapper(([2,6,12,16,22,26,32,36,42,46]), ([BCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH]))
|
|
self.gsm_receiver_0 = gsm.receiver(4, ([0]), ([]))
|
|
self.gsm_message_printer_0 = gsm.message_printer()
|
|
self.gsm_input_0 = gsm.gsm_input(
|
|
ppm=0,
|
|
osr=4,
|
|
fc=fc,
|
|
samp_rate_in=samp_rate,
|
|
)
|
|
self.gsm_control_channels_decoder_0 = gsm.control_channels_decoder()
|
|
self.gsm_clock_offset_control_0 = gsm.clock_offset_control(fc)
|
|
self.blocks_socket_pdu_0 = blocks.socket_pdu("UDP_CLIENT", "127.0.0.1", "4729", 10000)
|
|
self.blocks_file_source_0 = blocks.file_source(gr.sizeof_gr_complex*1, input_file_name, False)
|
|
|
|
##################################################
|
|
# Connections
|
|
##################################################
|
|
self.connect((self.gsm_input_0, 0), (self.gsm_receiver_0, 0))
|
|
self.connect((self.blocks_file_source_0, 0), (self.gsm_input_0, 0))
|
|
|
|
##################################################
|
|
# Asynch Message Connections
|
|
##################################################
|
|
self.msg_connect(self.gsm_receiver_0, "measurements", self.gsm_clock_offset_control_0, "measurements")
|
|
self.msg_connect(self.gsm_control_channels_decoder_0, "msgs", self.gsm_message_printer_0, "msgs")
|
|
self.msg_connect(self.gsm_universal_ctrl_chans_demapper_0, "bursts", self.gsm_control_channels_decoder_0, "bursts")
|
|
self.msg_connect(self.gsm_receiver_0, "C0", self.gsm_universal_ctrl_chans_demapper_0, "bursts")
|
|
self.msg_connect(self.gsm_clock_offset_control_0, "ppm", self.gsm_input_0, "ppm_in")
|
|
self.msg_connect(self.gsm_control_channels_decoder_0, "msgs", self.blocks_socket_pdu_0, "pdus")
|
|
|
|
# QT sink close method reimplementation
|
|
|
|
def get_input_file_name(self):
|
|
return self.input_file_name
|
|
|
|
def set_input_file_name(self, input_file_name):
|
|
self.input_file_name = input_file_name
|
|
self.blocks_file_source_0.open(self.input_file_name, False)
|
|
|
|
def get_fc(self):
|
|
return self.fc
|
|
|
|
def set_fc(self, fc):
|
|
self.fc = fc
|
|
self.gsm_input_0.set_fc(self.fc)
|
|
|
|
def get_samp_rate(self):
|
|
return self.samp_rate
|
|
|
|
def set_samp_rate(self, samp_rate):
|
|
self.samp_rate = samp_rate
|
|
self.gsm_input_0.set_samp_rate_in(self.samp_rate)
|
|
|
|
def get_SDCCH(self):
|
|
return self.SDCCH
|
|
|
|
def set_SDCCH(self, SDCCH):
|
|
self.SDCCH = SDCCH
|
|
|
|
def get_RACH(self):
|
|
return self.RACH
|
|
|
|
def set_RACH(self, RACH):
|
|
self.RACH = RACH
|
|
|
|
def get_PCH(self):
|
|
return self.PCH
|
|
|
|
def set_PCH(self, PCH):
|
|
self.PCH = PCH
|
|
|
|
def get_CHANNEL_UNKNOWN(self):
|
|
return self.CHANNEL_UNKNOWN
|
|
|
|
def set_CHANNEL_UNKNOWN(self, CHANNEL_UNKNOWN):
|
|
self.CHANNEL_UNKNOWN = CHANNEL_UNKNOWN
|
|
|
|
def get_CCCH(self):
|
|
return self.CCCH
|
|
|
|
def set_CCCH(self, CCCH):
|
|
self.CCCH = CCCH
|
|
|
|
def get_BCCH(self):
|
|
return self.BCCH
|
|
|
|
def set_BCCH(self, BCCH):
|
|
self.BCCH = BCCH
|
|
|
|
def get_AGCH(self):
|
|
return self.AGCH
|
|
|
|
def set_AGCH(self, AGCH):
|
|
self.AGCH = AGCH
|
|
|
|
if __name__ == '__main__':
|
|
parser = OptionParser(option_class=eng_option, usage="%prog: [options]")
|
|
parser.add_option("-i", "--input-file-name", dest="input_file_name", type="string", default="input.cfile",
|
|
help="Set input.cfile [default=%default]")
|
|
parser.add_option("-f", "--fc", dest="fc", type="eng_float", default=eng_notation.num_to_str(937e6),
|
|
help="Set fc [default=%default]")
|
|
parser.add_option("-s", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(1e6),
|
|
help="Set samp_rate [default=%default]")
|
|
(options, args) = parser.parse_args()
|
|
tb = airprobe_file(input_file_name=options.input_file_name, fc=options.fc, samp_rate=options.samp_rate)
|
|
tb.start()
|
|
tb.wait()
|
|
|