#!/usr/bin/env python3 # # Copyright 2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # # GNU Radio is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # GNU Radio is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with GNU Radio; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # import osmosdr from gnuradio import blocks from gnuradio import gr from gnuradio import eng_notation from gnuradio.fft import window from gnuradio.eng_option import eng_option from optparse import OptionParser from functools import partial import sys import signal import time import datetime try: from PyQt5 import Qt from gnuradio import qtgui import sip from gnuradio.qtgui import Range, RangeWidget except ImportError: sys.stderr.write("Error importing GNU Radio's Qtgui.\n") sys.exit(1) class CallEvent(Qt.QEvent): """An event containing a request for a function call.""" EVENT_TYPE = Qt.QEvent.Type(Qt.QEvent.registerEventType()) def __init__(self, fn, *args, **kwargs): Qt.QEvent.__init__(self, self.EVENT_TYPE) self.fn = fn self.args = args self.kwargs = kwargs class freq_recv(gr.sync_block, Qt.QObject): def __init__(self, callback): gr.sync_block.__init__(self, name="freq_recv", in_sig=None, out_sig=None) Qt.QObject.__init__(self) self.set_freq=callback # Advertise 'msg' port self.message_port_register_in(gr.pmt.intern('msg')) self.set_msg_handler(gr.pmt.intern('msg'), self.handle_msg) def handle_msg(self, msg_pmt): # Unpack message & call set_freq on main thread meta = gr.pmt.to_python(gr.pmt.car(msg_pmt)) msg = gr.pmt.cdr(msg_pmt) if meta=="freq": freq = gr.pmt.to_double(msg) Qt.QCoreApplication.postEvent(self, CallEvent(self.set_freq, freq)) def event(self, event): event.accept() result = event.fn(*event.args, **event.kwargs) return True class app_top_block(gr.top_block, Qt.QMainWindow): def __init__(self, argv, title): gr.top_block.__init__(self, title) Qt.QMainWindow.__init__(self) self.setWindowTitle(title) parser = OptionParser(option_class=eng_option) parser.add_option("-a", "--args", type="string", default="", help="Device args, [default=%default]") parser.add_option("-A", "--antenna", type="string", default=None, help="Select RX antenna where appropriate") parser.add_option("", "--clock-source", help="Set the clock source; typically 'internal', 'external', 'external_1pps', 'mimo' or 'gpsdo'") parser.add_option("-s", "--samp-rate", type="eng_float", default=None, help="Set sample rate (bandwidth), minimum by default") parser.add_option("-f", "--center-freq", type="eng_float", default=None, help="Set frequency to FREQ", metavar="FREQ") parser.add_option("-c", "--freq-corr", type="eng_float", default=None, help="Set frequency correction (ppm)") parser.add_option("-g", "--gain", type="eng_float", default=None, help="Set gain in dB (default is midpoint)") parser.add_option("-G", "--gains", type="string", default=None, help="Set named gain in dB, name:gain,name:gain,...") parser.add_option("-r", "--record", type="string", default="/tmp/name-f%F-s%S-t%T.cfile", help="Filename to record to, available wildcards: %S: sample rate, %F: center frequency, %T: timestamp, Example: /tmp/name-f%F-s%S-t%T.cfile") parser.add_option("", "--dc-offset-mode", type="int", default=None, help="Set the RX frontend DC offset correction mode") parser.add_option("", "--iq-balance-mode", type="int", default=None, help="Set the RX frontend IQ imbalance correction mode") parser.add_option("-W", "--waterfall", action="store_true", default=False, help="Enable waterfall display") parser.add_option("-F", "--fosphor", action="store_true", default=False, help="Enable fosphor display") parser.add_option("-S", "--oscilloscope", action="store_true", default=False, help="Enable oscilloscope display") parser.add_option("-Q", "--qtgui", action="store_true", default=False, help="Enable QTgui 'all-in-one' display") parser.add_option("", "--avg-alpha", type="eng_float", default=1e-1, help="Set fftsink averaging factor, default=[%default]") parser.add_option("", "--averaging", action="store_true", default=False, help="Enable fftsink averaging, default=[%default]") parser.add_option("", "--peak-hold", action="store_true", default=False, help="Enable fftsink peak hold, default=[%default]") parser.add_option("", "--ref-scale", type="eng_float", default=1.0, help="Set dBFS=0dB input value, default=[%default]") parser.add_option("", "--fft-size", type="int", default=1024, help="Set number of FFT bins [default=%default]") parser.add_option("", "--fft-rate", type="int", default=30, help="Set FFT update rate, [default=%default]") parser.add_option("-v", "--verbose", action="store_true", default=False, help="Use verbose console output [default=%default]") (options, args) = parser.parse_args() if len(args) != 0: parser.print_help() sys.exit(1) self.options = options self._verbose = options.verbose try: self.src = osmosdr.source(options.args) except RuntimeError: print("Couldn't instanciate source (no device present?).", file=sys.stderr) sys.exit(1) try: self.src.get_sample_rates().start() except RuntimeError: print("Source has no sample rates (wrong device arguments?).", file=sys.stderr) sys.exit(1) # Set the antenna if options.antenna: self.src.set_antenna(options.antenna) # Set the clock source: if options.clock_source is not None: self.src.set_clock_source(options.clock_source) if options.samp_rate is None: options.samp_rate = self.src.get_sample_rates().start() if options.gain is None: gain = self.src.get_gain() if gain is None: # if no gain was specified, use the mid-point in dB r = self.src.get_gain_range() try: # empty gain range returned in file= mode options.gain = float(r.start()+r.stop())/2 except RuntimeError: options.gain = 0 else: options.gain = gain self.src.set_gain(options.gain) if self._verbose: gain_names = self.src.get_gain_names() for name in gain_names: rg = self.src.get_gain_range(name) print("%s gain range: start %g stop %g step %g" % (name, rg.start(), rg.stop(), rg.step())) if options.gains: for tuple in options.gains.split(","): name, gain = tuple.split(":") gain = int(gain) print("Setting gain %s to %g." % (name, gain)) self.src.set_gain(gain, name) if self._verbose: rates = self.src.get_sample_rates() print('Supported sample rates %.10g-%.10g step %.10g.' % (rates.start(), rates.stop(), rates.step())) self.bandwidth_ok = True try: rg = self.src.get_bandwidth_range() range_start = rg.start() if self._verbose: print('Supported bandwidth rates %.10g-%.10g step %.10g.' % (rg.start(), rg.stop(), rg.step())) except RuntimeError as ex: self.bandwidth_ok = False if options.center_freq is None: freq = self.src.get_center_freq() if freq != 0: options.center_freq = freq else: # if no freq was specified, use the mid-point in Hz r = self.src.get_freq_range() options.center_freq = float(r.start()+r.stop())/2 if self._verbose: print("Using auto-calculated mid-point frequency") input_rate = self.src.set_sample_rate(options.samp_rate) self.src.set_bandwidth(input_rate) if self._verbose: ranges = self.src.get_freq_range() print("Supported frequencies %s-%s"%(eng_notation.num_to_str(ranges.start()), eng_notation.num_to_str(ranges.stop()))) for name in self.src.get_gain_names(): print("GAIN(%s): %g"%(name, self.src.get_gain(name))) # initialize values from options if options.freq_corr is not None: self.set_freq_corr(options.freq_corr) self.dc_offset_mode = options.dc_offset_mode self.iq_balance_mode = options.iq_balance_mode # initialize reasonable defaults for DC / IQ correction self.dc_offset_real = 0 self.dc_offset_imag = 0 self.iq_balance_mag = 0 self.iq_balance_pha = 0 # see https://github.com/gnuradio/gnuradio/issues/5175 - 3.9 has a backport of pyqwidget, but 3.10 does not. check_qwidget = lambda : self.scope.pyqwidget() if "pyqwidget" in dir(self.scope) else self.scope.qwidget() if options.fosphor: from gnuradio import fosphor self.scope = fosphor.qt_sink_c() self.scope.set_frequency_range(0, input_rate) self.scope_win = sip.wrapinstance(check_qwidget(), Qt.QWidget) self.scope_win.setMinimumSize(800, 300) elif options.waterfall: self.scope = qtgui.waterfall_sink_c( options.fft_size, wintype=window.WIN_BLACKMAN_hARRIS, fc=0, bw=input_rate, name="", nconnections=1 ) self.scope.enable_grid(False) self.scope.enable_axis_labels(True) self.scope.set_intensity_range(-100, 20) self.scope_win = sip.wrapinstance(check_qwidget(), Qt.QWidget) self.scope_win.setMinimumSize(800, 420) elif options.oscilloscope: self.scope = qtgui.time_sink_c( options.fft_size, samp_rate=input_rate, name="", nconnections=1 ) self.scope_win = sip.wrapinstance(check_qwidget(), Qt.QWidget) self.scope_win.setMinimumSize(800, 600) elif options.qtgui: self.scope = qtgui.sink_c( options.fft_size, wintype=window.WIN_BLACKMAN_hARRIS, fc=0, bw=input_rate, name="", plotfreq=True, plotwaterfall=True, plottime=True, plotconst=True ) self.scope_win = sip.wrapinstance(check_qwidget(), Qt.QWidget) self.scope.set_update_time(1.0/10) self.scope_win.setMinimumSize(800, 600) else: self.scope = qtgui.freq_sink_c( fftsize=options.fft_size, wintype=window.WIN_BLACKMAN_hARRIS, fc=0, bw=input_rate, name="", nconnections=1 ) self.scope_win = sip.wrapinstance(check_qwidget(), Qt.QWidget) self.scope.disable_legend() self.scope_win.setMinimumSize(800, 420) self.connect((self.src, 0), (self.scope, 0)) try: self.freq = freq_recv(self.set_freq) self.msg_connect((self.scope, 'freq'), (self.freq, 'msg')) except RuntimeError: self.freq = None self.file_sink = blocks.file_sink(gr.sizeof_gr_complex, "/dev/null", False) self.file_sink.set_unbuffered(False) self.file_sink.close() # close the sink immediately # lock/connect/unlock at record button event did not work, so we leave it connected at all times self.connect(self.src, self.file_sink) self._build_gui() # set initial values if not self.set_freq(options.center_freq): self._set_status_msg("Failed to set initial frequency") if options.record is not None: self._fre.insert(options.record) def record_to_filename(self): s = self._fre.text() s = s.replace('%S', '%e' % self.src.get_sample_rate()) s = s.replace('%F', '%e' % self.src.get_center_freq()) s = s.replace('%T', datetime.datetime.now().strftime('%Y%m%d%H%M%S')) return s def _set_status_msg(self, msg, timeout=0): self.status.showMessage(msg, timeout) def _shrink(self, widget): """Try to shrink RangeWidget by removing unnecessary margins""" try: widget.layout().setContentsMargins(0, 0, 0, 0) widget.children()[0].layout().setContentsMargins(0, 0, 0, 0) except: pass def _add_section(self, text, layout): """Add a section header to the GUI""" frame = Qt.QWidget() frame_layout = Qt.QHBoxLayout() frame_layout.setContentsMargins(0, 0, 0, 0) frame.setLayout(frame_layout) wid = Qt.QLabel() wid.setText(text) wid.setStyleSheet("font-weight: bold;") frame_layout.addWidget(wid) wid = Qt.QFrame() wid.setFrameShape(Qt.QFrame.HLine) frame_layout.addWidget(wid) frame_layout.setStretchFactor(wid, 1) layout.addWidget(frame) def _chooser(self, names, callback, default=0): """A simple radio-button chooser""" buttons = Qt.QWidget() blayout = Qt.QHBoxLayout() bgroup = Qt.QButtonGroup() buttons.setObjectName("foo") buttons.setStyleSheet("QWidget#foo {border: 1px outset grey;}") buttons.setLayout(blayout) chooser = [] for (num, txt) in enumerate(names): rb = Qt.QRadioButton(txt) rb.clicked.connect(partial(callback, num)) chooser.append(rb) bgroup.addButton(rb,num) blayout.addWidget(rb) if num == default: rb.setChecked(True) return buttons def _build_gui(self): self.top_widget = Qt.QWidget() self.top_layout = Qt.QVBoxLayout(self.top_widget) self.top_layout.addWidget(self.scope_win) self.setCentralWidget(self.top_widget) self.status = Qt.QStatusBar() self.setStatusBar(self.status) self.status.setStyleSheet("QStatusBar{border-top: 1px outset grey;}") if hasattr(RangeWidget, 'EngSlider'): eng_widget="eng_slider" else: eng_widget="counter_slider" ################################################## # Frequency controls ################################################## self._add_section("Frequency", self.top_layout) r = self.src.get_freq_range() self._fr = Range(r.start(), r.stop(), (r.start()+r.stop())/100, self.src.get_center_freq(), 200) self._fw = RangeWidget(self._fr, self.set_freq, 'Center Frequency (Hz)', eng_widget, float) self._shrink(self._fw) self.top_layout.addWidget(self._fw) if hasattr(self, 'ppm') and self.ppm is not None: self._fcr = Range(-100, 100, 0.1, self.src.get_freq_corr(), 200) self._fcw = RangeWidget(self._fcr, self.set_freq_corr, 'Freq. Correction (ppm)', "counter_slider", float) self._shrink(self._fcw) self.top_layout.addWidget(self._fcw) ################################################## # Gain controls ################################################## self._add_section("Gains", self.top_layout) self._gr={} self._gw={} for gain_name in self.src.get_gain_names(): rg = self.src.get_gain_range(gain_name) self._gr[gain_name] = Range(rg.start(), rg.stop(), rg.step(), self.src.get_gain(gain_name), 100) self._gw[gain_name] = RangeWidget(self._gr[gain_name], partial(self.set_named_gain,name=gain_name), '%s Gain (dB):'%gain_name, "counter_slider", float) self._shrink(self._gw[gain_name]) self._gw[gain_name].d_widget.counter.setDecimals(2) self.top_layout.addWidget(self._gw[gain_name]) ################################################## # Bandwidth controls ################################################## if self.bandwidth_ok: self._add_section("Bandwidth", self.top_layout) r = self.src.get_bandwidth_range() self._bwr = Range(r.start(), r.stop(), r.step() or (r.stop() - r.start())/100, self.src.get_bandwidth(), 100) self._bww = RangeWidget(self._bwr, self.set_bandwidth, 'Bandwidth (Hz):', eng_widget, float) self._shrink(self._bww) self.top_layout.addWidget(self._bww) ################################################## # Sample rate controls ################################################## self._add_section("Sample Rate", self.top_layout) r = self.src.get_sample_rates() self._srr = Range(r.start(), r.stop(), r.step() or (r.stop() - r.start())/100, self.src.get_sample_rate(), 100) self._srw = RangeWidget(self._srr, self.set_sample_rate, 'Sample Rate (Hz)', eng_widget, float) self._shrink(self._srw) self.top_layout.addWidget(self._srw) ################################################## # File recording controls ################################################## self._add_section("File recording", self.top_layout) wid = Qt.QWidget() layout = Qt.QHBoxLayout() layout.setContentsMargins(0, 0, 0, 0) self._frl = Qt.QLabel('File Name') layout.addWidget(self._frl) self._fre = Qt.QLineEdit() layout.addWidget(self._fre) self._frb = Qt.QPushButton('REC') layout.addWidget(self._frb) wid.setLayout(layout) self.top_layout.addWidget(wid) self.recording = 0 def record_callback(): self.recording = 1-self.recording if self.recording: self._srw.setDisabled(True) self._fre.setDisabled(True) self._frb.setText('STOP') self.rec_file_name = self.record_to_filename() print("Recording samples to ", self.rec_file_name) self.file_sink.open(self.rec_file_name); else: self._srw.setDisabled(False) self._fre.setDisabled(False) self._frb.setText('REC') self.file_sink.close() print("Finished recording to", self.rec_file_name) self._fre.returnPressed.connect(record_callback) self._frb.clicked.connect(record_callback) ################################################## # DC Offset controls ################################################## if self.dc_offset_mode != None: self._add_section("DC Offset Correction", self.top_layout) wid = Qt.QWidget() layout = Qt.QHBoxLayout() layout.setContentsMargins(0, 0, 0, 0) self._dcb = self._chooser(["Off", "Manual", "Auto"], self.set_dc_offset_mode, self.dc_offset_mode) layout.addWidget(self._dcb) self._dcrr = Range(-1, +1, 0.001, 0, 20) self._dcrw = RangeWidget(self._dcrr, self.set_dc_offset_real, 'Real', "counter_slider", float) self._shrink(self._dcrw) layout.addWidget(self._dcrw) self._dcir = Range(-1, +1, 0.001, 0, 20) self._dciw = RangeWidget(self._dcrr, self.set_dc_offset_imag, 'Imag', "counter_slider", float) self._shrink(self._dciw) layout.addWidget(self._dciw) wid.setLayout(layout) self.top_layout.addWidget(wid) ################################################## # IQ Imbalance controls ################################################## if self.iq_balance_mode != None: self._add_section("IQ Imbalance Correction", self.top_layout) wid = Qt.QWidget() layout = Qt.QHBoxLayout() layout.setContentsMargins(0, 0, 0, 0) self._iqb = self._chooser(["Off", "Manual", "Auto"], self.set_dc_offset_mode, self.iq_balance_mode) layout.addWidget(self._iqb) self._iqmr = Range(-1, +1, 0.001, 0, 20) self._iqmw = RangeWidget(self._iqmr, self.set_iq_balance_mag, 'Mag', "counter_slider", float) self._shrink(self._iqmw) layout.addWidget(self._iqmw) self._iqpr = Range(-1, +1, 0.001, 0, 20) self._iqpw = RangeWidget(self._iqpr, self.set_iq_balance_pha, 'Pha', "counter_slider", float) self._shrink(self._iqpw) layout.addWidget(self._iqpw) wid.setLayout(layout) self.top_layout.addWidget(wid) def set_dc_offset_mode(self, dc_offset_mode): if dc_offset_mode == 1: self._dcrw.setDisabled(False) self._dciw.setDisabled(False) self.set_dc_offset() else: self._dcrw.setDisabled(True) self._dciw.setDisabled(True) self.dc_offset_mode = dc_offset_mode self.src.set_dc_offset_mode(dc_offset_mode) def set_dc_offset_real(self, value): self.dc_offset_real = value self.set_dc_offset() def set_dc_offset_imag(self, value): self.dc_offset_imag = value self.set_dc_offset() def set_dc_offset(self): correction = complex(self.dc_offset_real, self.dc_offset_imag) try: self.src.set_dc_offset(correction) if self._verbose: print("Set DC offset to", correction) except RuntimeError as ex: print(ex) def set_iq_balance_mode(self, iq_balance_mode): if iq_balance_mode == 1: self._iqpw.setDisabled(False) self._iqmw.setDisabled(False) self.set_iq_balance() else: self._iqpw.setDisabled(True) self._iqmw.setDisabled(True) self.iq_balance_mode = iq_balance_mode self.src.set_iq_balance_mode(iq_balance_mode) def set_iq_balance_mag(self, value): self.iq_balance_mag = value self.set_iq_balance() def set_iq_balance_pha(self, value): self.iq_balance_pha = value self.set_iq_balance() def set_iq_balance(self): correction = complex(self.iq_balance_mag, self.iq_balance_pha) try: self.src.set_iq_balance(correction) if self._verbose: print("Set IQ balance to", correction) except RuntimeError as ex: print(ex) def set_sample_rate(self, samp_rate): samp_rate = self.src.set_sample_rate(samp_rate) if hasattr(self.scope, 'set_frequency_range'): self.scope.set_frequency_range(self.src.get_center_freq(), samp_rate) if hasattr(self.scope, 'set_sample_rate'): self.scope.set_sample_rate(samp_rate) if self._verbose: print("Set sample rate to:", samp_rate) try: if hasattr(self._bww.d_widget, 'setValue'): self._bww.d_widget.setValue(samp_rate) else: self._bww.d_widget.counter.setValue(samp_rate) except (RuntimeError, AttributeError): pass return samp_rate def set_named_gain(self, gain, name): if self._verbose: print("Trying to set " + name + " gain to:", gain) gain = self.src.set_gain(gain, name) if self._verbose: print("Set " + name + " gain to:", gain) def set_bandwidth(self, bw): if self._verbose: print("Trying to set bandwidth to:", bw) clipped_bw = self.src.get_bandwidth_range().clip(bw) if self._verbose: print("Clipping bandwidth to:", clipped_bw) if self.src.get_bandwidth() != clipped_bw: bw = self.src.set_bandwidth(clipped_bw) if self._verbose: print("Set bandwidth to:", bw) return bw def set_freq(self, freq): freq = self.src.set_center_freq(freq) if hasattr(self.scope, 'set_frequency_range'): self.scope.set_frequency_range(freq, self.src.get_sample_rate()) if hasattr(self.scope, 'set_baseband_freq'): self.scope.set_baseband_freq(freq) try: if hasattr(self._fw.d_widget, 'setValue'): self._fw.d_widget.setValue(freq) else: self._fw.d_widget.counter.setValue(freq) except (RuntimeError, AttributeError): pass if freq is not None: if self._verbose: print("Set center frequency to %.10g"%freq) elif self._verbose: print("Failed to set freq.") return freq def set_freq_corr(self, ppm): self.ppm = self.src.set_freq_corr(ppm) if self._verbose: print("Set frequency correction to:", self.ppm) def main(): qapp = Qt.QApplication(sys.argv) tb = app_top_block(qapp.arguments(), "osmocom Spectrum Browser") tb.start() tb.show() def sig_handler(sig=None, frame=None): print("caught signal") Qt.QApplication.quit() signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) # this timer is necessary for signals (^C) to work timer = Qt.QTimer() timer.start(500) timer.timeout.connect(lambda: None) def quitting(): tb.stop() tb.wait() qapp.aboutToQuit.connect(quitting) qapp.exec_() if __name__ == '__main__': main()