gr-gsm/apps/airprobe_file.py

148 lines
5.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python
##################################################
# Gnuradio Python Flow Graph
# Title: Airprobe File
# Generated: Sat Dec 13 09:44:41 2014
##################################################
from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from optparse import OptionParser
import grgsm
class airprobe_file(gr.top_block):
def __init__(self, input_file_name="input.cfile", fc=937e6, samp_rate=1e6):
gr.top_block.__init__(self, "Airprobe File")
##################################################
# Parameters
##################################################
self.input_file_name = input_file_name
self.fc = fc
self.samp_rate = samp_rate
##################################################
# Variables
##################################################
self.SDCCH = SDCCH = 6
self.RACH = RACH = 3
self.PCH = PCH = 5
self.CHANNEL_UNKNOWN = CHANNEL_UNKNOWN = 0
self.CCCH = CCCH = 2
self.BCCH = BCCH = 1
self.AGCH = AGCH = 4
##################################################
# Blocks
##################################################
self.gsm_universal_ctrl_chans_demapper_0 = grgsm.universal_ctrl_chans_demapper(([2,6,12,16,22,26,32,36,42,46]), ([BCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH]))
self.gsm_receiver_0 = grgsm.receiver(4, ([0]), ([]))
self.gsm_message_printer_0 = grgsm.message_printer()
self.gsm_input_0 = grgsm.gsm_input(
ppm=0,
osr=4,
fc=fc,
samp_rate_in=samp_rate,
)
self.gsm_control_channels_decoder_0 = gsm.control_channels_decoder()
self.gsm_clock_offset_control_0 = gsm.clock_offset_control(fc)
self.blocks_socket_pdu_0 = blocks.socket_pdu("UDP_CLIENT", "127.0.0.1", "4729", 10000)
self.blocks_file_source_0 = blocks.file_source(gr.sizeof_gr_complex*1, input_file_name, False)
##################################################
# Connections
##################################################
self.connect((self.gsm_input_0, 0), (self.gsm_receiver_0, 0))
self.connect((self.blocks_file_source_0, 0), (self.gsm_input_0, 0))
##################################################
# Asynch Message Connections
##################################################
self.msg_connect(self.gsm_receiver_0, "measurements", self.gsm_clock_offset_control_0, "measurements")
self.msg_connect(self.gsm_control_channels_decoder_0, "msgs", self.gsm_message_printer_0, "msgs")
self.msg_connect(self.gsm_universal_ctrl_chans_demapper_0, "bursts", self.gsm_control_channels_decoder_0, "bursts")
self.msg_connect(self.gsm_receiver_0, "C0", self.gsm_universal_ctrl_chans_demapper_0, "bursts")
self.msg_connect(self.gsm_clock_offset_control_0, "ppm", self.gsm_input_0, "ppm_in")
self.msg_connect(self.gsm_control_channels_decoder_0, "msgs", self.blocks_socket_pdu_0, "pdus")
def get_input_file_name(self):
return self.input_file_name
def set_input_file_name(self, input_file_name):
self.input_file_name = input_file_name
self.blocks_file_source_0.open(self.input_file_name, False)
def get_fc(self):
return self.fc
def set_fc(self, fc):
self.fc = fc
self.gsm_input_0.set_fc(self.fc)
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.gsm_input_0.set_samp_rate_in(self.samp_rate)
def get_SDCCH(self):
return self.SDCCH
def set_SDCCH(self, SDCCH):
self.SDCCH = SDCCH
def get_RACH(self):
return self.RACH
def set_RACH(self, RACH):
self.RACH = RACH
def get_PCH(self):
return self.PCH
def set_PCH(self, PCH):
self.PCH = PCH
def get_CHANNEL_UNKNOWN(self):
return self.CHANNEL_UNKNOWN
def set_CHANNEL_UNKNOWN(self, CHANNEL_UNKNOWN):
self.CHANNEL_UNKNOWN = CHANNEL_UNKNOWN
def get_CCCH(self):
return self.CCCH
def set_CCCH(self, CCCH):
self.CCCH = CCCH
def get_BCCH(self):
return self.BCCH
def set_BCCH(self, BCCH):
self.BCCH = BCCH
def get_AGCH(self):
return self.AGCH
def set_AGCH(self, AGCH):
self.AGCH = AGCH
if __name__ == '__main__':
parser = OptionParser(option_class=eng_option, usage="%prog: [options]")
parser.add_option("-i", "--input-file-name", dest="input_file_name", type="string", default="input.cfile",
help="Set input.cfile [default=%default]")
parser.add_option("-f", "--fc", dest="fc", type="eng_float", default=eng_notation.num_to_str(937e6),
help="Set fc [default=%default]")
parser.add_option("-s", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(1e6),
help="Set samp_rate [default=%default]")
(options, args) = parser.parse_args()
tb = airprobe_file(input_file_name=options.input_file_name, fc=options.fc, samp_rate=options.samp_rate)
tb.start()
tb.wait()