gr-osmosdr/apps/osmocom_fft

515 lines
19 KiB
Python
Executable File

#!/usr/bin/env python
#
# Copyright 2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
SAMP_RANGE_KEY = 'samp_range'
SAMP_RATE_KEY = 'samp_rate'
GAIN_KEY = lambda x: 'gain:'+x
BWIDTH_KEY = 'bwidth'
CENTER_FREQ_KEY = 'center_freq'
FREQ_CORR_KEY = 'freq_corr'
FREQ_RANGE_KEY = 'freq_range'
GAIN_RANGE_KEY = lambda x: 'gain_range:'+x
BWIDTH_RANGE_KEY = 'bwidth_range'
import osmosdr
from gnuradio import gr, gru
from gnuradio import eng_notation
from gnuradio.gr.pubsub import pubsub
from gnuradio.eng_option import eng_option
from optparse import OptionParser
import sys
import numpy
try:
from gnuradio.wxgui import stdgui2, form, slider
from gnuradio.wxgui import forms
from gnuradio.wxgui import fftsink2, waterfallsink2, scopesink2
import wx
except ImportError:
sys.stderr.write("Error importing GNU Radio's wxgui. Please make sure gr-wxgui is installed.\n")
sys.exit(1)
class app_top_block(stdgui2.std_top_block, pubsub):
def __init__(self, frame, panel, vbox, argv):
stdgui2.std_top_block.__init__(self, frame, panel, vbox, argv)
pubsub.__init__(self)
self.frame = frame
self.panel = panel
parser = OptionParser(option_class=eng_option)
parser.add_option("-a", "--args", type="string", default="",
help="Device args, [default=%default]")
parser.add_option("-A", "--antenna", type="string", default=None,
help="Select RX antenna where appropriate")
parser.add_option("-s", "--samp-rate", type="eng_float", default=None,
help="Set sample rate (bandwidth), minimum by default")
parser.add_option("-f", "--center-freq", type="eng_float", default=None,
help="Set frequency to FREQ", metavar="FREQ")
parser.add_option("-c", "--freq-corr", type="eng_float", default=0,
help="Set frequency correction (ppm)")
parser.add_option("-g", "--gain", type="eng_float", default=None,
help="Set gain in dB (default is midpoint)")
parser.add_option("-W", "--waterfall", action="store_true", default=False,
help="Enable waterfall display")
parser.add_option("-F", "--fosphor", action="store_true", default=False,
help="Enable fosphor display")
parser.add_option("-S", "--oscilloscope", action="store_true", default=False,
help="Enable oscilloscope display")
parser.add_option("", "--avg-alpha", type="eng_float", default=1e-1,
help="Set fftsink averaging factor, default=[%default]")
parser.add_option ("", "--averaging", action="store_true", default=False,
help="Enable fftsink averaging, default=[%default]")
parser.add_option("", "--ref-scale", type="eng_float", default=1.0,
help="Set dBFS=0dB input value, default=[%default]")
parser.add_option("", "--fft-size", type="int", default=1024,
help="Set number of FFT bins [default=%default]")
parser.add_option("", "--fft-rate", type="int", default=30,
help="Set FFT update rate, [default=%default]")
parser.add_option("-v", "--verbose", action="store_true", default=False,
help="Use verbose console output [default=%default]")
(options, args) = parser.parse_args()
if len(args) != 0:
parser.print_help()
sys.exit(1)
self.options = options
self._verbose = options.verbose
self.src = osmosdr.source(options.args)
try:
self.src.get_sample_rates().start()
except RuntimeError:
print "Source has no sample rates (wrong device arguments?)."
sys.exit(1)
# Set the antenna
if(options.antenna):
self.src.set_antenna(options.antenna)
if options.samp_rate is None:
options.samp_rate = self.src.get_sample_rates().start()
if options.gain is None:
gain = self.src.get_gain()
if gain is None:
# if no gain was specified, use the mid-point in dB
r = self.src.get_gain_range()
try: # empty gain range returned in file= mode
options.gain = float(r.start()+r.stop())/2
except RuntimeError:
options.gain = 0
pass
else:
options.gain = gain
if options.center_freq is None:
freq = self.src.get_center_freq()
if freq != 0:
options.center_freq = freq
else:
# if no freq was specified, use the mid-point in Hz
r = self.src.get_freq_range()
options.center_freq = float(r.start()+r.stop())/2
input_rate = self.src.set_sample_rate(options.samp_rate)
self.src.set_bandwidth(input_rate)
self.src.set_gain(options.gain)
self.publish(SAMP_RANGE_KEY, self.src.get_sample_rates)
self.publish(FREQ_RANGE_KEY, self.src.get_freq_range)
for name in self.get_gain_names():
self.publish(GAIN_RANGE_KEY(name), (lambda self=self,name=name: self.src.get_gain_range(name)))
self.publish(BWIDTH_RANGE_KEY, self.src.get_bandwidth_range)
for name in self.get_gain_names():
self.publish(GAIN_KEY(name), (lambda self=self,name=name: self.src.get_gain(name)))
self.publish(BWIDTH_KEY, self.src.get_bandwidth)
#initialize values from options
self[SAMP_RANGE_KEY] = self.src.get_sample_rates()
self[SAMP_RATE_KEY] = options.samp_rate
self[CENTER_FREQ_KEY] = options.center_freq
self[FREQ_CORR_KEY] = options.freq_corr
#subscribe set methods
self.subscribe(SAMP_RATE_KEY, self.set_sample_rate)
for name in self.get_gain_names():
self.subscribe(GAIN_KEY(name), (lambda gain,self=self,name=name: self.set_named_gain(gain, name)))
self.subscribe(BWIDTH_KEY, self.set_bandwidth)
self.subscribe(CENTER_FREQ_KEY, self.set_freq)
self.subscribe(FREQ_CORR_KEY, self.set_freq_corr)
#force update on pubsub keys
#for key in (SAMP_RATE_KEY, BWIDTH_KEY, CENTER_FREQ_KEY, FREQ_CORR_KEY):
#print key, "=", self[key]
#self[key] = self[key]
if options.fosphor:
from gnuradio import fosphor
self.scope = fosphor.wx_sink_c(panel, size=(800,300))
elif options.waterfall:
self.scope = waterfallsink2.waterfall_sink_c (panel,
fft_size=options.fft_size,
sample_rate=input_rate,
ref_scale=options.ref_scale,
ref_level=20.0,
y_divs = 12)
self.scope.set_callback(self.wxsink_callback)
self.frame.SetMinSize((800, 420))
elif options.oscilloscope:
self.scope = scopesink2.scope_sink_c(panel, sample_rate=input_rate)
self.frame.SetMinSize((800, 600))
else:
self.scope = fftsink2.fft_sink_c (panel,
fft_size=options.fft_size,
sample_rate=input_rate,
ref_scale=options.ref_scale,
ref_level=20.0,
y_divs = 12,
average=options.averaging,
avg_alpha=options.avg_alpha,
fft_rate=options.fft_rate)
self.scope.set_callback(self.wxsink_callback)
self.frame.SetMinSize((800, 420))
self.connect(self.src, self.scope)
self._build_gui(vbox)
# set initial values
if not(self.set_freq(options.center_freq)):
self._set_status_msg("Failed to set initial frequency")
def wxsink_callback(self, x, y):
self.set_freq_from_callback(x)
def _set_status_msg(self, msg):
self.frame.GetStatusBar().SetStatusText(msg, 0)
def _build_gui(self, vbox):
if hasattr(self.scope, 'win'):
vbox.Add(self.scope.win, 0, wx.EXPAND)
vbox.AddSpacer(3)
# add control area at the bottom
self.myform = myform = form.form()
##################################################
# Frequency controls
##################################################
fc_vbox = forms.static_box_sizer(parent=self.panel,
label="Center Frequency",
orient=wx.VERTICAL,
bold=True)
fc_vbox.AddSpacer(3)
# First row of frequency controls (center frequency)
freq_hbox = wx.BoxSizer(wx.HORIZONTAL)
fc_vbox.Add(freq_hbox, 0, wx.EXPAND)
fc_vbox.AddSpacer(5)
# Second row of frequency controls (freq. correction)
corr_hbox = wx.BoxSizer(wx.HORIZONTAL)
fc_vbox.Add(corr_hbox, 0, wx.EXPAND)
fc_vbox.AddSpacer(3)
# Add frequency controls to top window sizer
vbox.Add(fc_vbox, 0, wx.EXPAND)
vbox.AddSpacer(5)
vbox.AddStretchSpacer()
freq_hbox.AddSpacer(3)
forms.text_box(
parent=self.panel, sizer=freq_hbox,
label='Center Frequency (Hz)',
proportion=1,
converter=forms.float_converter(),
ps=self,
key=CENTER_FREQ_KEY,
)
freq_hbox.AddSpacer(5)
try: # range.start() == range.stop() in file= mode
forms.slider(
parent=self.panel, sizer=freq_hbox,
proportion=3,
ps=self,
key=CENTER_FREQ_KEY,
minimum=self[FREQ_RANGE_KEY].start(),
maximum=self[FREQ_RANGE_KEY].stop(),
num_steps=1000,
)
freq_hbox.AddSpacer(3)
except AssertionError:
pass
corr_hbox.AddSpacer(3)
forms.text_box(
parent=self.panel, sizer=corr_hbox,
label='Freq. Correction (ppm)',
proportion=1,
converter=forms.float_converter(),
ps=self,
key=FREQ_CORR_KEY,
)
corr_hbox.AddSpacer(5)
forms.slider(
parent=self.panel, sizer=corr_hbox,
proportion=3,
ps=self,
key=FREQ_CORR_KEY,
minimum=-100,
maximum=+100,
num_steps=2010,
step_size=0.1,
)
corr_hbox.AddSpacer(3)
##################################################
# Gain controls
##################################################
gc_vbox = forms.static_box_sizer(parent=self.panel,
label="Gain Settings",
orient=wx.VERTICAL,
bold=True)
gc_vbox.AddSpacer(3)
# Add gain controls to top window sizer
vbox.Add(gc_vbox, 0, wx.EXPAND)
vbox.AddSpacer(5)
vbox.AddStretchSpacer()
for gain_name in self.get_gain_names():
range = self[GAIN_RANGE_KEY(gain_name)]
gain = self[GAIN_KEY(gain_name)]
#print gain_name, gain, range.to_pp_string()
if range.start() < range.stop():
gain_hbox = wx.BoxSizer(wx.HORIZONTAL)
gc_vbox.Add(gain_hbox, 0, wx.EXPAND)
gc_vbox.AddSpacer(3)
gain_hbox.AddSpacer(3)
forms.text_box(
parent=self.panel, sizer=gain_hbox,
proportion=1,
converter=forms.float_converter(),
ps=self,
key=GAIN_KEY(gain_name),
label=gain_name + " Gain (dB)",
)
gain_hbox.AddSpacer(5)
forms.slider(
parent=self.panel, sizer=gain_hbox,
proportion=3,
ps=self,
key=GAIN_KEY(gain_name),
minimum=range.start(),
maximum=range.stop(),
step_size=range.step(),
)
gain_hbox.AddSpacer(3)
##################################################
# Bandwidth controls
##################################################
try:
bw_range = self[BWIDTH_RANGE_KEY]
#print bw_range.to_pp_string()
if bw_range.start() < bw_range.stop():
bwidth_vbox = forms.static_box_sizer(parent=self.panel,
label="Bandwidth",
orient=wx.VERTICAL,
bold=True)
bwidth_vbox.AddSpacer(3)
bwidth_hbox = wx.BoxSizer(wx.HORIZONTAL)
bwidth_vbox.Add(bwidth_hbox, 0, wx.EXPAND)
bwidth_vbox.AddSpacer(3)
vbox.Add(bwidth_vbox, 0, wx.EXPAND)
vbox.AddSpacer(5)
vbox.AddStretchSpacer()
bwidth_hbox.AddSpacer(3)
forms.text_box(
parent=self.panel, sizer=bwidth_hbox,
proportion=1,
converter=forms.float_converter(),
ps=self,
key=BWIDTH_KEY,
label="Bandwidth (Hz)",
)
bwidth_hbox.AddSpacer(5)
forms.slider(
parent=self.panel, sizer=bwidth_hbox,
proportion=3,
ps=self,
key=BWIDTH_KEY,
minimum=bw_range.start(),
maximum=bw_range.stop(),
step_size=bw_range.step(),
)
bwidth_hbox.AddSpacer(3)
except RuntimeError:
pass
##################################################
# Sample rate controls
##################################################
sr_vbox = forms.static_box_sizer(parent=self.panel,
label="Sample Rate",
orient=wx.VERTICAL,
bold=True)
sr_vbox.AddSpacer(3)
# First row of sample rate controls
sr_hbox = wx.BoxSizer(wx.HORIZONTAL)
sr_vbox.Add(sr_hbox, 0, wx.EXPAND)
sr_vbox.AddSpacer(5)
# Add frequency controls to top window sizer
vbox.Add(sr_vbox, 0, wx.EXPAND)
vbox.AddSpacer(5)
vbox.AddStretchSpacer()
sr_hbox.AddSpacer(3)
forms.text_box(
parent=self.panel, sizer=sr_hbox,
label='Sample Rate (Hz)',
proportion=1,
converter=forms.float_converter(),
ps=self,
key=SAMP_RATE_KEY,
)
sr_hbox.AddSpacer(5)
#forms.slider(
# parent=self.panel, sizer=sr_hbox,
# proportion=3,
# ps=self,
# key=SAMP_RATE_KEY,
# minimum=self[SAMP_RANGE_KEY].start(),
# maximum=self[SAMP_RANGE_KEY].stop(),
# step_size=self[SAMP_RANGE_KEY].step(),
#)
#sr_hbox.AddSpacer(3)
def set_sample_rate(self, samp_rate):
samp_rate = self.src.set_sample_rate(samp_rate)
if hasattr(self.scope, 'set_sample_rate'):
self.scope.set_sample_rate(samp_rate)
if self._verbose:
print "Set sample rate to:", samp_rate
try:
self[BWIDTH_KEY] = self.set_bandwidth(samp_rate)
except RuntimeError:
pass
return samp_rate
def get_gain_names(self):
return self.src.get_gain_names()
def set_named_gain(self, gain, name):
if gain is None:
g = self[GAIN_RANGE_KEY(name)]
gain = float(g.start()+g.stop())/2
if self._verbose:
print "Using auto-calculated mid-point gain"
self[GAIN_KEY(name)] = gain
return
gain = self.src.set_gain(gain, name)
if self._verbose:
print "Set " + name + " gain to:", gain
def set_bandwidth(self, bw):
clipped_bw = self[BWIDTH_RANGE_KEY].clip(bw)
if self.src.get_bandwidth() != clipped_bw:
bw = self.src.set_bandwidth(clipped_bw)
if self._verbose:
print "Set bandwidth to:", bw
return bw
def set_freq_from_callback(self, freq):
freq = self.src.set_center_freq(freq)
self[CENTER_FREQ_KEY] = freq;
def set_freq(self, freq):
if freq is None:
f = self[FREQ_RANGE_KEY]
freq = float(f.start()+f.stop())/2.0
if self._verbose:
print "Using auto-calculated mid-point frequency"
self[CENTER_FREQ_KEY] = freq
return
freq = self.src.set_center_freq(freq)
if hasattr(self.scope, 'set_baseband_freq'):
self.scope.set_baseband_freq(freq)
if freq is not None:
if self._verbose:
print "Set center frequency to", freq
elif self._verbose:
print "Failed to set freq."
return freq
def set_freq_corr(self, ppm):
if ppm is None:
ppm = 0.0
if self._verbose:
print "Using frequency corrrection of", ppm
self[FREQ_CORR_KEY] = ppm
return
ppm = self.src.set_freq_corr(ppm)
if self._verbose:
print "Set frequency correction to:", ppm
def main ():
app = stdgui2.stdapp(app_top_block, "osmocom Spectrum Browser", nstatus=1)
app.MainLoop()
if __name__ == '__main__':
main ()