demod: Import the new GR 3.7 code

This adds demodulator code compatible with gnuradio 3.7 series.

Change-Id: Ibaf7f9c552cc1625210a75f2e9ec142ab47ce8d6
Signed-off-by: Sylvain Munaut <tnt@246tNt.com>
This commit is contained in:
Harald Welte 2016-12-18 16:44:04 +01:00
parent 359ef6ba24
commit 409d2985cb
2 changed files with 600 additions and 0 deletions

345
src/demod/cqpsk.py Normal file
View File

@ -0,0 +1,345 @@
#
# Copyright 2005,2006,2007 Free Software Foundation, Inc.
#
# cqpsk.py (C) Copyright 2009, KA1RBI
#
# This file is part of GNU Radio
#
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
# See gnuradio-examples/python/digital for examples
"""
differential PI/4 CQPSK modulation and demodulation.
"""
from gnuradio import gr, gru, analog, blocks, digital, filter
from math import pi, sqrt
import cmath
from pprint import pprint
# default values (used in __init__ and add_options)
_def_samples_per_symbol = 10
_def_excess_bw = 0.35
_def_gray_code = True
_def_verbose = False
_def_log = False
_def_costas_alpha = 0.15
_def_gain_mu = None
_def_mu = 0.5
_def_omega_relative_limit = 0.005
# /////////////////////////////////////////////////////////////////////////////
# CQPSK modulator
# /////////////////////////////////////////////////////////////////////////////
class cqpsk_mod(gr.hier_block2):
def __init__(self,
samples_per_symbol=_def_samples_per_symbol,
excess_bw=_def_excess_bw,
verbose=_def_verbose,
log=_def_log):
"""
Hierarchical block for RRC-filtered QPSK modulation.
The input is a byte stream (unsigned char) and the
output is the complex modulated signal at baseband.
@param samples_per_symbol: samples per symbol >= 2
@type samples_per_symbol: integer
@param excess_bw: Root-raised cosine filter excess bandwidth
@type excess_bw: float
@param verbose: Print information about modulator?
@type verbose: bool
@param debug: Print modualtion data to files?
@type debug: bool
"""
gr.hier_block2.__init__(self, "cqpsk_mod",
gr.io_signature(1, 1, gr.sizeof_char), # Input signature
gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
self._samples_per_symbol = samples_per_symbol
self._excess_bw = excess_bw
if not isinstance(samples_per_symbol, int) or samples_per_symbol < 2:
raise TypeError, ("sbp must be an integer >= 2, is %d" % samples_per_symbol)
ntaps = 11 * samples_per_symbol
arity = 8
# turn bytes into k-bit vectors
self.bytes2chunks = \
gr.packed_to_unpacked_bb(self.bits_per_symbol(), gr.GR_MSB_FIRST)
# 0 +45 1 [+1]
# 1 +135 3 [+3]
# 2 -45 7 [-1]
# 3 -135 5 [-3]
self.pi4map = [1, 3, 7, 5]
self.symbol_mapper = gr.map_bb(self.pi4map)
self.diffenc = gr.diff_encoder_bb(arity)
self.chunks2symbols = gr.chunks_to_symbols_bc(psk.constellation[arity])
# pulse shaping filter
self.rrc_taps = filter.firdes.root_raised_cosine(
self._samples_per_symbol, # gain (sps since we're interpolating by sps)
self._samples_per_symbol, # sampling rate
1.0, # symbol rate
self._excess_bw, # excess bandwidth (roll-off factor)
ntaps)
self.rrc_filter = filter.interp_fir_filter_ccf(self._samples_per_symbol, self.rrc_taps)
if verbose:
self._print_verbage()
if log:
self._setup_logging()
# Connect & Initialize base class
self.connect(self, self.bytes2chunks, self.symbol_mapper, self.diffenc,
self.chunks2symbols, self.rrc_filter, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@staticmethod
def bits_per_symbol():
return 2
def _print_verbage(self):
print "\nModulator:"
print "bits per symbol: %d" % self.bits_per_symbol()
print "Gray code: %s" % self._gray_code
print "RRS roll-off factor: %f" % self._excess_bw
def _setup_logging(self):
print "Modulation logging turned on."
self.connect(self.bytes2chunks,
gr.file_sink(gr.sizeof_char, "tx_bytes2chunks.dat"))
self.connect(self.symbol_mapper,
gr.file_sink(gr.sizeof_char, "tx_graycoder.dat"))
self.connect(self.diffenc,
gr.file_sink(gr.sizeof_char, "tx_diffenc.dat"))
self.connect(self.chunks2symbols,
gr.file_sink(gr.sizeof_gr_complex, "tx_chunks2symbols.dat"))
self.connect(self.rrc_filter,
gr.file_sink(gr.sizeof_gr_complex, "tx_rrc_filter.dat"))
@staticmethod
def add_options(parser):
"""
Adds QPSK modulation-specific options to the standard parser
"""
parser.add_option("", "--excess-bw", type="float", default=_def_excess_bw,
help="set RRC excess bandwith factor [default=%default] (PSK)")
parser.add_option("", "--no-gray-code", dest="gray_code",
action="store_false", default=_def_gray_code,
help="disable gray coding on modulated bits (PSK)")
@staticmethod
def extract_kwargs_from_options(options):
"""
Given command line options, create dictionary suitable for passing to __init__
"""
return modulation_utils.extract_kwargs_from_options(dqpsk_mod.__init__,
('self',), options)
# /////////////////////////////////////////////////////////////////////////////
# CQPSK demodulator
#
# /////////////////////////////////////////////////////////////////////////////
class cqpsk_demod(gr.hier_block2):
def __init__(self,
samples_per_symbol=_def_samples_per_symbol,
excess_bw=_def_excess_bw,
costas_alpha=_def_costas_alpha,
gain_mu=_def_gain_mu,
mu=_def_mu,
omega_relative_limit=_def_omega_relative_limit,
gray_code=_def_gray_code,
verbose=_def_verbose,
log=_def_log):
"""
Hierarchical block for RRC-filtered CQPSK demodulation
The input is the complex modulated signal at baseband.
The output is a stream of floats in [ -3 / -1 / +1 / +3 ]
@param samples_per_symbol: samples per symbol >= 2
@type samples_per_symbol: float
@param excess_bw: Root-raised cosine filter excess bandwidth
@type excess_bw: float
@param costas_alpha: loop filter gain
@type costas_alphas: float
@param gain_mu: for M&M block
@type gain_mu: float
@param mu: for M&M block
@type mu: float
@param omega_relative_limit: for M&M block
@type omega_relative_limit: float
@param gray_code: Tell modulator to Gray code the bits
@type gray_code: bool
@param verbose: Print information about modulator?
@type verbose: bool
@param debug: Print modualtion data to files?
@type debug: bool
"""
gr.hier_block2.__init__(self, "cqpsk_demod",
gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
gr.io_signature(1, 1, gr.sizeof_float)) # Output signature
self._samples_per_symbol = samples_per_symbol
self._excess_bw = excess_bw
self._costas_alpha = costas_alpha
self._mm_gain_mu = gain_mu
self._mm_mu = mu
self._mm_omega_relative_limit = omega_relative_limit
self._gray_code = gray_code
if samples_per_symbol < 2:
raise TypeError, "sbp must be >= 2, is %d" % samples_per_symbol
arity = pow(2,self.bits_per_symbol())
# Automatic gain control
scale = (1.0/16384.0)
self.pre_scaler = blocks.multiply_const_cc(scale) # scale the signal from full-range to +-1
#self.agc = analog.agc2_cc(0.6e-1, 1e-3, 1, 1, 100)
self.agc = analog.feedforward_agc_cc(16, 2.0)
# RRC data filter
ntaps = 11 * samples_per_symbol
self.rrc_taps = filter.firdes.root_raised_cosine(
1.0, # gain
self._samples_per_symbol, # sampling rate
1.0, # symbol rate
self._excess_bw, # excess bandwidth (roll-off factor)
ntaps)
self.rrc_filter=filter.interp_fir_filter_ccf(1, self.rrc_taps)
if not self._mm_gain_mu:
sbs_to_mm = {2: 0.050, 3: 0.075, 4: 0.11, 5: 0.125, 6: 0.15, 7: 0.15}
self._mm_gain_mu = sbs_to_mm[samples_per_symbol]
self._mm_omega = self._samples_per_symbol
self._mm_gain_omega = .25 * self._mm_gain_mu * self._mm_gain_mu
self._costas_beta = 0.25 * self._costas_alpha * self._costas_alpha
fmin = -0.025
fmax = 0.025
self.receiver=digital.mpsk_receiver_cc(
arity, pi/4.0,
2*pi/150,
fmin, fmax,
self._mm_mu, self._mm_gain_mu,
self._mm_omega, self._mm_gain_omega,
self._mm_omega_relative_limit
)
self.receiver.set_alpha(self._costas_alpha)
self.receiver.set_beta(self._costas_beta)
# Perform Differential decoding on the constellation
self.diffdec = digital.diff_phasor_cc()
# take angle of the difference (in radians)
self.to_float = blocks.complex_to_arg()
# convert from radians such that signal is in -3/-1/+1/+3
self.rescale = blocks.multiply_const_ff( 1 / (pi / 4) )
if verbose:
self._print_verbage()
if log:
self._setup_logging()
# Connect & Initialize base class
self.connect(self, self.pre_scaler, self.agc, self.rrc_filter, self.receiver,
self.diffdec, self.to_float, self.rescale, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@staticmethod
def bits_per_symbol():
return 2
def _print_verbage(self):
print "\nDemodulator:"
print "bits per symbol: %d" % self.bits_per_symbol()
print "Gray code: %s" % self._gray_code
print "RRC roll-off factor: %.2f" % self._excess_bw
print "Costas Loop alpha: %.2e" % self._costas_alpha
print "Costas Loop beta: %.2e" % self._costas_beta
print "M&M mu: %.2f" % self._mm_mu
print "M&M mu gain: %.2e" % self._mm_gain_mu
print "M&M omega: %.2f" % self._mm_omega
print "M&M omega gain: %.2e" % self._mm_gain_omega
print "M&M omega limit: %.2f" % self._mm_omega_relative_limit
def _setup_logging(self):
print "Modulation logging turned on."
self.connect(self.pre_scaler,
gr.file_sink(gr.sizeof_gr_complex, "rx_prescaler.dat"))
self.connect(self.agc,
gr.file_sink(gr.sizeof_gr_complex, "rx_agc.dat"))
self.connect(self.rrc_filter,
gr.file_sink(gr.sizeof_gr_complex, "rx_rrc_filter.dat"))
self.connect(self.receiver,
gr.file_sink(gr.sizeof_gr_complex, "rx_receiver.dat"))
self.connect(self.diffdec,
gr.file_sink(gr.sizeof_gr_complex, "rx_diffdec.dat"))
self.connect(self.to_float,
gr.file_sink(gr.sizeof_float, "rx_to_float.dat"))
self.connect(self.rescale,
gr.file_sink(gr.sizeof_float, "rx_rescale.dat"))
@staticmethod
def add_options(parser):
"""
Adds modulation-specific options to the standard parser
"""
parser.add_option("", "--excess-bw", type="float", default=_def_excess_bw,
help="set RRC excess bandwith factor [default=%default] (PSK)")
parser.add_option("", "--no-gray-code", dest="gray_code",
action="store_false", default=_def_gray_code,
help="disable gray coding on modulated bits (PSK)")
parser.add_option("", "--costas-alpha", type="float", default=_def_costas_alpha,
help="set Costas loop alpha value [default=%default] (PSK)")
parser.add_option("", "--gain-mu", type="float", default=_def_gain_mu,
help="set M&M symbol sync loop gain mu value [default=%default] (PSK)")
parser.add_option("", "--mu", type="float", default=_def_mu,
help="set M&M symbol sync loop mu value [default=%default] (PSK)")
@staticmethod
def extract_kwargs_from_options(options):
"""
Given command line options, create dictionary suitable for passing to __init__
"""
return modulation_utils.extract_kwargs_from_options(
cqpsk_demod.__init__, ('self',), options)

View File

@ -0,0 +1,255 @@
#!/usr/bin/env python
# Copyright 2012 Dimitri Stolnikov <horiz0n@gmx.net>
# Usage:
# src$ ./demod/python/osmosdr-tetra_demod_fft.py -o /dev/stdout | ./float_to_bits /dev/stdin /dev/stdout | ./tetra-rx /dev/stdin
#
# Adjust the center frequency (-f) and gain (-g) according to your needs.
# Use left click in Wideband Spectrum window to roughly select a TETRA carrier.
# In Wideband Spectrum you can also tune by 1/4 of the bandwidth by clicking on the rightmost/leftmost spectrum side.
# Use left click in Channel Spectrum windows to fine tune the carrier by clicking on the left or right side of the spectrum.
import sys
import math
from gnuradio import gr, gru, eng_notation, blocks, filter
from gnuradio.filter import pfb
from gnuradio.eng_option import eng_option
from gnuradio.wxgui import fftsink2, scopesink2, forms, TRIG_MODE_AUTO
from grc_gnuradio import wxgui as grc_wxgui
from optparse import OptionParser
import osmosdr
import wx
try:
import cqpsk
except:
from tetra_demod import cqpsk
# applies frequency translation, resampling and demodulation
class top_block(grc_wxgui.top_block_gui):
def __init__(self):
grc_wxgui.top_block_gui.__init__(self, title="Top Block")
options = get_options()
self.ifreq = options.frequency
self.rfgain = options.gain
self.src = osmosdr.source(options.args)
self.src.set_center_freq(self.ifreq)
self.src.set_sample_rate(int(options.sample_rate))
if self.rfgain is None:
self.src.set_gain_mode(1)
self.iagc = 1
self.rfgain = 0
else:
self.iagc = 0
self.src.set_gain_mode(0)
self.src.set_gain(self.rfgain)
# may differ from the requested rate
sample_rate = self.src.get_sample_rate()
sys.stderr.write("sample rate: %d\n" % (sample_rate))
symbol_rate = 18000
sps = 2 # output rate will be 36,000
out_sample_rate = symbol_rate * sps
options.low_pass = options.low_pass / 2.0
if sample_rate == 96000: # FunCube Dongle
first_decim = 2
else:
first_decim = 10
self.offset = 0
taps = filter.firdes.low_pass(1.0, sample_rate, options.low_pass, options.low_pass * 0.2, filter.firdes.WIN_HANN)
self.tuner = filter.freq_xlating_fir_filter_ccf(first_decim, taps, self.offset, sample_rate)
self.demod = cqpsk.cqpsk_demod(
samples_per_symbol = sps,
excess_bw=0.35,
costas_alpha=0.03,
gain_mu=0.05,
mu=0.05,
omega_relative_limit=0.05,
log=options.log,
verbose=options.verbose)
self.output = blocks.file_sink(gr.sizeof_float, options.output_file)
rerate = float(sample_rate / float(first_decim)) / float(out_sample_rate)
sys.stderr.write("resampling factor: %f\n" % rerate)
if rerate.is_integer():
sys.stderr.write("using pfb decimator\n")
self.resamp = pfb.decimator_ccf(int(rerate))
else:
sys.stderr.write("using pfb resampler\n")
self.resamp = pfb.arb_resampler_ccf(1 / rerate)
self.connect(self.src, self.tuner, self.resamp, self.demod, self.output)
self.Main = wx.Notebook(self.GetWin(), style=wx.NB_TOP)
self.Main.AddPage(grc_wxgui.Panel(self.Main), "Wideband Spectrum")
self.Main.AddPage(grc_wxgui.Panel(self.Main), "Channel Spectrum")
self.Main.AddPage(grc_wxgui.Panel(self.Main), "Soft Bits")
def set_ifreq(ifreq):
self.ifreq = ifreq
self._ifreq_text_box.set_value(self.ifreq)
self.src.set_center_freq(self.ifreq)
self._ifreq_text_box = forms.text_box(
parent=self.GetWin(),
value=self.ifreq,
callback=set_ifreq,
label="Center Frequency",
converter=forms.float_converter(),
)
self.Add(self._ifreq_text_box)
def set_iagc(iagc):
self.iagc = iagc
self._agc_check_box.set_value(self.iagc)
self.src.set_gain_mode(self.iagc, 0)
self.src.set_gain(0 if self.iagc == 1 else self.rfgain, 0)
self._agc_check_box = forms.check_box(
parent=self.GetWin(),
value=self.iagc,
callback=set_iagc,
label="Automatic Gain",
true=1,
false=0,
)
self.Add(self._agc_check_box)
def set_rfgain(rfgain):
self.rfgain = rfgain
self._rfgain_slider.set_value(self.rfgain)
self._rfgain_text_box.set_value(self.rfgain)
self.src.set_gain(0 if self.iagc == 1 else self.rfgain, 0)
_rfgain_sizer = wx.BoxSizer(wx.VERTICAL)
self._rfgain_text_box = forms.text_box(
parent=self.GetWin(),
sizer=_rfgain_sizer,
value=self.rfgain,
callback=set_rfgain,
label="RF Gain",
converter=forms.float_converter(),
proportion=0,
)
self._rfgain_slider = forms.slider(
parent=self.GetWin(),
sizer=_rfgain_sizer,
value=self.rfgain,
callback=set_rfgain,
minimum=0,
maximum=50,
num_steps=200,
style=wx.SL_HORIZONTAL,
cast=float,
proportion=1,
)
self.Add(_rfgain_sizer)
self.Add(self.Main)
def fftsink2_callback(x, y):
if abs(x / (sample_rate / 2)) > 0.9:
set_ifreq(self.ifreq + x / 2)
else:
sys.stderr.write("coarse tuned to: %d Hz\n" % x)
self.offset = x
self.tuner.set_center_freq(self.offset)
self.scope = fftsink2.fft_sink_c(self.Main.GetPage(0).GetWin(),
title="Wideband Spectrum (click to coarse tune)",
fft_size=1024,
sample_rate=sample_rate,
ref_scale=2.0,
ref_level=0,
y_divs=10,
fft_rate=10,
average=False,
avg_alpha=0.6)
self.Main.GetPage(0).Add(self.scope.win)
self.scope.set_callback(fftsink2_callback)
self.connect(self.src, self.scope)
def fftsink2_callback2(x, y):
self.offset = self.offset - (x / 10)
sys.stderr.write("fine tuned to: %d Hz\n" % self.offset)
self.tuner.set_center_freq(self.offset)
self.scope2 = fftsink2.fft_sink_c(self.Main.GetPage(1).GetWin(),
title="Channel Spectrum (click to fine tune)",
fft_size=1024,
sample_rate=out_sample_rate,
ref_scale=2.0,
ref_level=-20,
y_divs=10,
fft_rate=10,
average=False,
avg_alpha=0.6)
self.Main.GetPage(1).Add(self.scope2.win)
self.scope2.set_callback(fftsink2_callback2)
self.connect(self.resamp, self.scope2)
self.scope3 = scopesink2.scope_sink_f(
self.Main.GetPage(2).GetWin(),
title="Soft Bits",
sample_rate=out_sample_rate,
v_scale=0,
v_offset=0,
t_scale=0.001,
ac_couple=False,
xy_mode=False,
num_inputs=1,
trig_mode=TRIG_MODE_AUTO,
y_axis_label="Counts",
)
self.Main.GetPage(2).Add(self.scope3.win)
self.connect(self.demod, self.scope3)
def get_options():
parser = OptionParser(option_class=eng_option)
parser.add_option("-a", "--args", type="string", default="",
help="gr-osmosdr device arguments")
parser.add_option("-s", "--sample-rate", type="eng_float", default=1800000,
help="set receiver sample rate (default 1800000)")
parser.add_option("-f", "--frequency", type="eng_float", default=394.4e6,
help="set receiver center frequency")
parser.add_option("-g", "--gain", type="eng_float", default=None,
help="set receiver gain")
# demodulator related settings
parser.add_option("-l", "--log", action="store_true", default=False, help="dump debug .dat files")
parser.add_option("-L", "--low-pass", type="eng_float", default=25e3, help="low pass cut-off", metavar="Hz")
parser.add_option("-o", "--output-file", type="string", default="out.float", help="specify the bit output file")
parser.add_option("-v", "--verbose", action="store_true", default=False, help="dump demodulation data")
(options, args) = parser.parse_args()
if len(args) != 0:
parser.print_help()
raise SystemExit, 1
return (options)
if __name__ == '__main__':
tb = top_block()
tb.Run(True)