GNU Radio block for interfacing with various radio hardware
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
gr-osmosdr/apps/osmocom_fft

714 lines
26 KiB

#!/usr/bin/env python3
#
# Copyright 2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
import osmosdr
from gnuradio import blocks
from gnuradio import gr
from gnuradio import eng_notation
from gnuradio.fft import window
from gnuradio.eng_option import eng_option
from optparse import OptionParser
from functools import partial
import sys
import signal
import time
import datetime
try:
from PyQt5 import Qt
from gnuradio import qtgui
import sip
from gnuradio.qtgui import Range, RangeWidget
except ImportError:
sys.stderr.write("Error importing GNU Radio's Qtgui.\n")
sys.exit(1)
class CallEvent(Qt.QEvent):
"""An event containing a request for a function call."""
EVENT_TYPE = Qt.QEvent.Type(Qt.QEvent.registerEventType())
def __init__(self, fn, *args, **kwargs):
Qt.QEvent.__init__(self, self.EVENT_TYPE)
self.fn = fn
self.args = args
self.kwargs = kwargs
class freq_recv(gr.sync_block, Qt.QObject):
def __init__(self, callback):
gr.sync_block.__init__(self, name="freq_recv", in_sig=None, out_sig=None)
Qt.QObject.__init__(self)
self.set_freq=callback
# Advertise 'msg' port
self.message_port_register_in(gr.pmt.intern('msg'))
self.set_msg_handler(gr.pmt.intern('msg'), self.handle_msg)
def handle_msg(self, msg_pmt):
# Unpack message & call set_freq on main thread
meta = gr.pmt.to_python(gr.pmt.car(msg_pmt))
msg = gr.pmt.cdr(msg_pmt)
if meta=="freq":
freq = gr.pmt.to_double(msg)
Qt.QCoreApplication.postEvent(self, CallEvent(self.set_freq, freq))
def event(self, event):
event.accept()
result = event.fn(*event.args, **event.kwargs)
return True
class app_top_block(gr.top_block, Qt.QMainWindow):
def __init__(self, argv, title):
gr.top_block.__init__(self, title)
Qt.QMainWindow.__init__(self)
self.setWindowTitle(title)
parser = OptionParser(option_class=eng_option)
parser.add_option("-a", "--args", type="string", default="",
help="Device args, [default=%default]")
parser.add_option("-A", "--antenna", type="string", default=None,
help="Select RX antenna where appropriate")
parser.add_option("", "--clock-source",
help="Set the clock source; typically 'internal', 'external', 'external_1pps', 'mimo' or 'gpsdo'")
parser.add_option("-s", "--samp-rate", type="eng_float", default=None,
help="Set sample rate (bandwidth), minimum by default")
parser.add_option("-f", "--center-freq", type="eng_float", default=None,
help="Set frequency to FREQ", metavar="FREQ")
parser.add_option("-c", "--freq-corr", type="eng_float", default=None,
help="Set frequency correction (ppm)")
parser.add_option("-g", "--gain", type="eng_float", default=None,
help="Set gain in dB (default is midpoint)")
parser.add_option("-G", "--gains", type="string", default=None,
help="Set named gain in dB, name:gain,name:gain,...")
parser.add_option("-r", "--record", type="string", default="/tmp/name-f%F-s%S-t%T.cfile",
help="Filename to record to, available wildcards: %S: sample rate, %F: center frequency, %T: timestamp, Example: /tmp/name-f%F-s%S-t%T.cfile")
parser.add_option("", "--dc-offset-mode", type="int", default=None,
help="Set the RX frontend DC offset correction mode")
parser.add_option("", "--iq-balance-mode", type="int", default=None,
help="Set the RX frontend IQ imbalance correction mode")
parser.add_option("-W", "--waterfall", action="store_true", default=False,
help="Enable waterfall display")
parser.add_option("-F", "--fosphor", action="store_true", default=False,
help="Enable fosphor display")
parser.add_option("-S", "--oscilloscope", action="store_true", default=False,
help="Enable oscilloscope display")
parser.add_option("-Q", "--qtgui", action="store_true", default=False,
help="Enable QTgui 'all-in-one' display")
parser.add_option("", "--avg-alpha", type="eng_float", default=1e-1,
help="Set fftsink averaging factor, default=[%default]")
parser.add_option("", "--averaging", action="store_true", default=False,
help="Enable fftsink averaging, default=[%default]")
parser.add_option("", "--peak-hold", action="store_true", default=False,
help="Enable fftsink peak hold, default=[%default]")
parser.add_option("", "--ref-scale", type="eng_float", default=1.0,
help="Set dBFS=0dB input value, default=[%default]")
parser.add_option("", "--fft-size", type="int", default=1024,
help="Set number of FFT bins [default=%default]")
parser.add_option("", "--fft-rate", type="int", default=30,
help="Set FFT update rate, [default=%default]")
parser.add_option("-v", "--verbose", action="store_true", default=False,
help="Use verbose console output [default=%default]")
(options, args) = parser.parse_args()
if len(args) != 0:
parser.print_help()
sys.exit(1)
self.options = options
self._verbose = options.verbose
try:
self.src = osmosdr.source(options.args)
except RuntimeError:
print("Couldn't instanciate source (no device present?).", file=sys.stderr)
sys.exit(1)
try:
self.src.get_sample_rates().start()
except RuntimeError:
print("Source has no sample rates (wrong device arguments?).", file=sys.stderr)
sys.exit(1)
# Set the antenna
if options.antenna:
self.src.set_antenna(options.antenna)
# Set the clock source:
if options.clock_source is not None:
self.src.set_clock_source(options.clock_source)
if options.samp_rate is None:
options.samp_rate = self.src.get_sample_rates().start()
if options.gain is None:
gain = self.src.get_gain()
if gain is None:
# if no gain was specified, use the mid-point in dB
r = self.src.get_gain_range()
try: # empty gain range returned in file= mode
options.gain = float(r.start()+r.stop())/2
except RuntimeError:
options.gain = 0
else:
options.gain = gain
self.src.set_gain(options.gain)
if self._verbose:
gain_names = self.src.get_gain_names()
for name in gain_names:
rg = self.src.get_gain_range(name)
print("%s gain range: start %g stop %g step %g" % (name, rg.start(), rg.stop(), rg.step()))
if options.gains:
for tuple in options.gains.split(","):
name, gain = tuple.split(":")
gain = int(gain)
print("Setting gain %s to %g." % (name, gain))
self.src.set_gain(gain, name)
if self._verbose:
rates = self.src.get_sample_rates()
print('Supported sample rates %.10g-%.10g step %.10g.' % (rates.start(), rates.stop(), rates.step()))
self.bandwidth_ok = True
try:
rg = self.src.get_bandwidth_range()
range_start = rg.start()
if self._verbose:
print('Supported bandwidth rates %.10g-%.10g step %.10g.' % (rg.start(), rg.stop(), rg.step()))
except RuntimeError as ex:
self.bandwidth_ok = False
if options.center_freq is None:
freq = self.src.get_center_freq()
if freq != 0:
options.center_freq = freq
else:
# if no freq was specified, use the mid-point in Hz
r = self.src.get_freq_range()
options.center_freq = float(r.start()+r.stop())/2
if self._verbose:
print("Using auto-calculated mid-point frequency")
input_rate = self.src.set_sample_rate(options.samp_rate)
self.src.set_bandwidth(input_rate)
if self._verbose:
ranges = self.src.get_freq_range()
print("Supported frequencies %s-%s"%(eng_notation.num_to_str(ranges.start()), eng_notation.num_to_str(ranges.stop())))
for name in self.src.get_gain_names():
print("GAIN(%s): %g"%(name, self.src.get_gain(name)))
# initialize values from options
if options.freq_corr is not None:
self.set_freq_corr(options.freq_corr)
self.dc_offset_mode = options.dc_offset_mode
self.iq_balance_mode = options.iq_balance_mode
# initialize reasonable defaults for DC / IQ correction
self.dc_offset_real = 0
self.dc_offset_imag = 0
self.iq_balance_mag = 0
self.iq_balance_pha = 0
if options.fosphor:
from gnuradio import fosphor
self.scope = fosphor.qt_sink_c()
self.scope.set_frequency_range(0, input_rate)
self.scope_win = sip.wrapinstance(self.scope.pyqwidget(), Qt.QWidget)
self.scope_win.setMinimumSize(800, 300)
elif options.waterfall:
self.scope = qtgui.waterfall_sink_c(
options.fft_size,
wintype=window.WIN_BLACKMAN_hARRIS,
fc=0,
bw=input_rate,
name="",
nconnections=1
)
self.scope.enable_grid(False)
self.scope.enable_axis_labels(True)
self.scope.set_intensity_range(-100, 20)
self.scope_win = sip.wrapinstance(self.scope.pyqwidget(), Qt.QWidget)
self.scope_win.setMinimumSize(800, 420)
elif options.oscilloscope:
self.scope = qtgui.time_sink_c(
options.fft_size,
samp_rate=input_rate,
name="",
nconnections=1
)
self.scope_win = sip.wrapinstance(self.scope.pyqwidget(), Qt.QWidget)
self.scope_win.setMinimumSize(800, 600)
elif options.qtgui:
self.scope = qtgui.sink_c(
options.fft_size,
wintype=window.WIN_BLACKMAN_hARRIS,
fc=0,
bw=input_rate,
name="",
plotfreq=True,
plotwaterfall=True,
plottime=True,
plotconst=True
)
self.scope_win = sip.wrapinstance(self.scope.pyqwidget(), Qt.QWidget)
self.scope.set_update_time(1.0/10)
self.scope_win.setMinimumSize(800, 600)
else:
self.scope = qtgui.freq_sink_c(
fftsize=options.fft_size,
wintype=window.WIN_BLACKMAN_hARRIS,
fc=0,
bw=input_rate,
name="",
nconnections=1
)
self.scope_win = sip.wrapinstance(self.scope.pyqwidget(), Qt.QWidget)
self.scope.disable_legend()
self.scope_win.setMinimumSize(800, 420)
self.connect((self.src, 0), (self.scope, 0))
try:
self.freq = freq_recv(self.set_freq)
self.msg_connect((self.scope, 'freq'), (self.freq, 'msg'))
except RuntimeError:
self.freq = None
self.file_sink = blocks.file_sink(gr.sizeof_gr_complex, "/dev/null", False)
self.file_sink.set_unbuffered(False)
self.file_sink.close() # close the sink immediately
# lock/connect/unlock at record button event did not work, so we leave it connected at all times
self.connect(self.src, self.file_sink)
self._build_gui()
# set initial values
if not self.set_freq(options.center_freq):
self._set_status_msg("Failed to set initial frequency")
if options.record is not None:
self._fre.insert(options.record)
def record_to_filename(self):
s = self._fre.text()
s = s.replace('%S', '%e' % self.src.get_sample_rate())
s = s.replace('%F', '%e' % self.src.get_center_freq())
s = s.replace('%T', datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
return s
def _set_status_msg(self, msg, timeout=0):
self.status.showMessage(msg, timeout)
def _shrink(self, widget):
"""Try to shrink RangeWidget by removing unnecessary margins"""
try:
widget.layout().setContentsMargins(0, 0, 0, 0)
widget.children()[0].layout().setContentsMargins(0, 0, 0, 0)
except:
pass
def _add_section(self, text, layout):
"""Add a section header to the GUI"""
frame = Qt.QWidget()
frame_layout = Qt.QHBoxLayout()
frame_layout.setContentsMargins(0, 0, 0, 0)
frame.setLayout(frame_layout)
wid = Qt.QLabel()
wid.setText(text)
wid.setStyleSheet("font-weight: bold;")
frame_layout.addWidget(wid)
wid = Qt.QFrame()
wid.setFrameShape(Qt.QFrame.HLine)
frame_layout.addWidget(wid)
frame_layout.setStretchFactor(wid, 1)
layout.addWidget(frame)
def _chooser(self, names, callback, default=0):
"""A simple radio-button chooser"""
buttons = Qt.QWidget()
blayout = Qt.QHBoxLayout()
bgroup = Qt.QButtonGroup()
buttons.setObjectName("foo")
buttons.setStyleSheet("QWidget#foo {border: 1px outset grey;}")
buttons.setLayout(blayout)
chooser = []
for (num, txt) in enumerate(names):
rb = Qt.QRadioButton(txt)
rb.clicked.connect(partial(callback, num))
chooser.append(rb)
bgroup.addButton(rb,num)
blayout.addWidget(rb)
if num == default:
rb.setChecked(True)
return buttons
def _build_gui(self):
self.top_widget = Qt.QWidget()
self.top_layout = Qt.QVBoxLayout(self.top_widget)
self.top_layout.addWidget(self.scope_win)
self.setCentralWidget(self.top_widget)
self.status = Qt.QStatusBar()
self.setStatusBar(self.status)
self.status.setStyleSheet("QStatusBar{border-top: 1px outset grey;}")
if hasattr(RangeWidget, 'EngSlider'):
eng_widget="eng_slider"
else:
eng_widget="counter_slider"
##################################################
# Frequency controls
##################################################
self._add_section("Frequency", self.top_layout)
r = self.src.get_freq_range()
self._fr = Range(r.start(), r.stop(), (r.start()+r.stop())/100, self.src.get_center_freq(), 200)
self._fw = RangeWidget(self._fr, self.set_freq, 'Center Frequency (Hz)', eng_widget, float)
self._shrink(self._fw)
self.top_layout.addWidget(self._fw)
if hasattr(self, 'ppm') and self.ppm is not None:
self._fcr = Range(-100, 100, 0.1, self.src.get_freq_corr(), 200)
self._fcw = RangeWidget(self._fcr, self.set_freq_corr, 'Freq. Correction (ppm)', "counter_slider", float)
self._shrink(self._fcw)
self.top_layout.addWidget(self._fcw)
##################################################
# Gain controls
##################################################
self._add_section("Gains", self.top_layout)
self._gr={}
self._gw={}
for gain_name in self.src.get_gain_names():
rg = self.src.get_gain_range(gain_name)
self._gr[gain_name] = Range(rg.start(), rg.stop(), rg.step(), self.src.get_gain(gain_name), 100)
self._gw[gain_name] = RangeWidget(self._gr[gain_name], partial(self.set_named_gain,name=gain_name), '%s Gain (dB):'%gain_name, "counter_slider", float)
self._shrink(self._gw[gain_name])
self._gw[gain_name].d_widget.counter.setDecimals(2)
self.top_layout.addWidget(self._gw[gain_name])
##################################################
# Bandwidth controls
##################################################
if self.bandwidth_ok:
self._add_section("Bandwidth", self.top_layout)
r = self.src.get_bandwidth_range()
self._bwr = Range(r.start(), r.stop(), r.step() or (r.stop() - r.start())/100, self.src.get_bandwidth(), 100)
self._bww = RangeWidget(self._bwr, self.set_bandwidth, 'Bandwidth (Hz):', eng_widget, float)
self._shrink(self._bww)
self.top_layout.addWidget(self._bww)
##################################################
# Sample rate controls
##################################################
self._add_section("Sample Rate", self.top_layout)
r = self.src.get_sample_rates()
self._srr = Range(r.start(), r.stop(), r.step() or (r.stop() - r.start())/100, self.src.get_sample_rate(), 100)
self._srw = RangeWidget(self._srr, self.set_sample_rate, 'Sample Rate (Hz)', eng_widget, float)
self._shrink(self._srw)
self.top_layout.addWidget(self._srw)
##################################################
# File recording controls
##################################################
self._add_section("File recording", self.top_layout)
wid = Qt.QWidget()
layout = Qt.QHBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
self._frl = Qt.QLabel('File Name')
layout.addWidget(self._frl)
self._fre = Qt.QLineEdit()
layout.addWidget(self._fre)
self._frb = Qt.QPushButton('REC')
layout.addWidget(self._frb)
wid.setLayout(layout)
self.top_layout.addWidget(wid)
self.recording = 0
def record_callback():
self.recording = 1-self.recording
if self.recording:
self._srw.setDisabled(True)
self._fre.setDisabled(True)
self._frb.setText('STOP')
self.rec_file_name = self.record_to_filename()
print("Recording samples to ", self.rec_file_name)
self.file_sink.open(self.rec_file_name);
else:
self._srw.setDisabled(False)
self._fre.setDisabled(False)
self._frb.setText('REC')
self.file_sink.close()
print("Finished recording to", self.rec_file_name)
self._fre.returnPressed.connect(record_callback)
self._frb.clicked.connect(record_callback)
##################################################
# DC Offset controls
##################################################
if self.dc_offset_mode != None:
self._add_section("DC Offset Correction", self.top_layout)
wid = Qt.QWidget()
layout = Qt.QHBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
self._dcb = self._chooser(["Off", "Manual", "Auto"], self.set_dc_offset_mode, self.dc_offset_mode)
layout.addWidget(self._dcb)
self._dcrr = Range(-1, +1, 0.001, 0, 20)
self._dcrw = RangeWidget(self._dcrr, self.set_dc_offset_real, 'Real', "counter_slider", float)
self._shrink(self._dcrw)
layout.addWidget(self._dcrw)
self._dcir = Range(-1, +1, 0.001, 0, 20)
self._dciw = RangeWidget(self._dcrr, self.set_dc_offset_imag, 'Imag', "counter_slider", float)
self._shrink(self._dciw)
layout.addWidget(self._dciw)
wid.setLayout(layout)
self.top_layout.addWidget(wid)
##################################################
# IQ Imbalance controls
##################################################
if self.iq_balance_mode != None:
self._add_section("IQ Imbalance Correction", self.top_layout)
wid = Qt.QWidget()
layout = Qt.QHBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
self._iqb = self._chooser(["Off", "Manual", "Auto"], self.set_dc_offset_mode, self.iq_balance_mode)
layout.addWidget(self._iqb)
self._iqmr = Range(-1, +1, 0.001, 0, 20)
self._iqmw = RangeWidget(self._iqmr, self.set_iq_balance_mag, 'Mag', "counter_slider", float)
self._shrink(self._iqmw)
layout.addWidget(self._iqmw)
self._iqpr = Range(-1, +1, 0.001, 0, 20)
self._iqpw = RangeWidget(self._iqpr, self.set_iq_balance_pha, 'Pha', "counter_slider", float)
self._shrink(self._iqpw)
layout.addWidget(self._iqpw)
wid.setLayout(layout)
self.top_layout.addWidget(wid)
def set_dc_offset_mode(self, dc_offset_mode):
if dc_offset_mode == 1:
self._dcrw.setDisabled(False)
self._dciw.setDisabled(False)
self.set_dc_offset()
else:
self._dcrw.setDisabled(True)
self._dciw.setDisabled(True)
self.dc_offset_mode = dc_offset_mode
self.src.set_dc_offset_mode(dc_offset_mode)
def set_dc_offset_real(self, value):
self.dc_offset_real = value
self.set_dc_offset()
def set_dc_offset_imag(self, value):
self.dc_offset_imag = value
self.set_dc_offset()
def set_dc_offset(self):
correction = complex(self.dc_offset_real, self.dc_offset_imag)
try:
self.src.set_dc_offset(correction)
if self._verbose:
print("Set DC offset to", correction)
except RuntimeError as ex:
print(ex)
def set_iq_balance_mode(self, iq_balance_mode):
if iq_balance_mode == 1:
self._iqpw.setDisabled(False)
self._iqmw.setDisabled(False)
self.set_iq_balance()
else:
self._iqpw.setDisabled(True)
self._iqmw.setDisabled(True)
self.iq_balance_mode = iq_balance_mode
self.src.set_iq_balance_mode(iq_balance_mode)
def set_iq_balance_mag(self, value):
self.iq_balance_mag = value
self.set_iq_balance()
def set_iq_balance_pha(self, value):
self.iq_balance_pha = value
self.set_iq_balance()
def set_iq_balance(self):
correction = complex(self.iq_balance_mag, self.iq_balance_pha)
try:
self.src.set_iq_balance(correction)
if self._verbose:
print("Set IQ balance to", correction)
except RuntimeError as ex:
print(ex)
def set_sample_rate(self, samp_rate):
samp_rate = self.src.set_sample_rate(samp_rate)
if hasattr(self.scope, 'set_frequency_range'):
self.scope.set_frequency_range(self.src.get_center_freq(), samp_rate)
if hasattr(self.scope, 'set_sample_rate'):
self.scope.set_sample_rate(samp_rate)
if self._verbose:
print("Set sample rate to:", samp_rate)
try:
if hasattr(self._bww.d_widget, 'setValue'):
self._bww.d_widget.setValue(samp_rate)
else:
self._bww.d_widget.counter.setValue(samp_rate)
except (RuntimeError, AttributeError):
pass
return samp_rate
def set_named_gain(self, gain, name):
if self._verbose:
print("Trying to set " + name + " gain to:", gain)
gain = self.src.set_gain(gain, name)
if self._verbose:
print("Set " + name + " gain to:", gain)
def set_bandwidth(self, bw):
if self._verbose:
print("Trying to set bandwidth to:", bw)
clipped_bw = self.src.get_bandwidth_range().clip(bw)
if self._verbose:
print("Clipping bandwidth to:", clipped_bw)
if self.src.get_bandwidth() != clipped_bw:
bw = self.src.set_bandwidth(clipped_bw)
if self._verbose:
print("Set bandwidth to:", bw)
return bw
def set_freq(self, freq):
freq = self.src.set_center_freq(freq)
if hasattr(self.scope, 'set_frequency_range'):
self.scope.set_frequency_range(freq, self.src.get_sample_rate())
if hasattr(self.scope, 'set_baseband_freq'):
self.scope.set_baseband_freq(freq)
try:
if hasattr(self._fw.d_widget, 'setValue'):
self._fw.d_widget.setValue(freq)
else:
self._fw.d_widget.counter.setValue(freq)
except (RuntimeError, AttributeError):
pass
if freq is not None:
if self._verbose:
print("Set center frequency to %.10g"%freq)
elif self._verbose:
print("Failed to set freq.")
return freq
def set_freq_corr(self, ppm):
self.ppm = self.src.set_freq_corr(ppm)
if self._verbose:
print("Set frequency correction to:", self.ppm)
def main():
qapp = Qt.QApplication(sys.argv)
tb = app_top_block(qapp.arguments(), "osmocom Spectrum Browser")
tb.start()
tb.show()
def sig_handler(sig=None, frame=None):
print("caught signal")
Qt.QApplication.quit()
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
# this timer is necessary for signals (^C) to work
timer = Qt.QTimer()
timer.start(500)
timer.timeout.connect(lambda: None)
def quitting():
tb.stop()
tb.wait()
qapp.aboutToQuit.connect(quitting)
qapp.exec_()
if __name__ == '__main__':
main()