This repository has been archived on 2022-02-17. You can view files and clone it, but cannot push or open issues or pull requests.
op25-legacy/python/op25_tx.py

272 lines
11 KiB
Python
Raw Normal View History

#!/usr/bin/env python
#
# Copyright 2005,2006,2007 Free Software Foundation, Inc.
#
# GNU Radio Multichannel APCO P25 Tx (c) Copyright 2009, KA1RBI
#
# This file is part of GNU Radio and part of OP25
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# It is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
"""
Transmit N simultaneous narrow band P25 C4FM signals.
They will be centered at the frequency specified on the command line,
and are spaced at 25kHz steps from there.
There are three main ways to run this program:
A. Combined audio capture, speech coding, and USRP transmission [default]
B. USRP transmission only, receives P25 symbol input via UDP channel [-p]
C. USRP transmission only, P25 symbol input taken from file(s) [-i]
By using method B, the coding and transmission functions can be
split over separate machines. Run the op25_remote_tx script prior to
running this script when using method B.
"""
from gnuradio import gr, eng_notation, repeater
from gnuradio import usrp
from gnuradio import audio
from gnuradio import blks2
from gnuradio.eng_option import eng_option
from optparse import OptionParser
from usrpm import usrp_dbid
import math
import sys
from gnuradio.wxgui import stdgui2, fftsink2
#from gnuradio import tx_debug_gui
import wx
import op25_c4fm_mod
########################################################
# instantiate one transmit chain for each call
class pipeline(gr.hier_block2):
def __init__(self, vocoder, lo_freq, audio_rate, if_rate):
gr.hier_block2.__init__(self, "pipeline",
gr.io_signature(0, 0, 0), # Input signature
gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
c4fm = op25_c4fm_mod.p25_mod_bf(output_sample_rate=audio_rate,
log=False,
verbose=True)
interp_factor = if_rate / audio_rate
low_pass = 2.88e3
interp_taps = gr.firdes.low_pass(1.0, if_rate, low_pass, low_pass * 0.1, gr.firdes.WIN_HANN)
interpolator = gr.interp_fir_filter_fff (int(interp_factor), interp_taps)
max_dev = 12.5e3
k = 2 * math.pi * max_dev / if_rate
adjustment = 1.5 # adjust for proper c4fm deviation level
modulator = gr.frequency_modulator_fc (k * adjustment)
# Local oscillator
lo = gr.sig_source_c (if_rate, # sample rate
gr.GR_SIN_WAVE, # waveform type
lo_freq, #frequency
1.0, # amplitude
0) # DC Offset
mixer = gr.multiply_cc ()
self.connect (vocoder, c4fm, interpolator, modulator, (mixer, 0))
self.connect (lo, (mixer, 1))
self.connect (mixer, self)
class fm_tx_block(stdgui2.std_top_block):
def __init__(self, frame, panel, vbox, argv):
MAX_CHANNELS = 7
stdgui2.std_top_block.__init__ (self, frame, panel, vbox, argv)
parser = OptionParser (option_class=eng_option)
parser.add_option("-T", "--tx-subdev-spec", type="subdev", default=None,
help="select USRP Tx side A or B")
parser.add_option("-e","--enable-fft", action="store_true", default=False,
help="enable spectrum plot (and use more CPU)")
parser.add_option("-f", "--freq", type="eng_float", default=None,
help="set Tx frequency to FREQ [required]", metavar="FREQ")
parser.add_option("-i","--file-input", action="store_true", default=False,
help="input from baseband-0.dat, baseband-1.dat ...")
parser.add_option("-g", "--audio-gain", type="eng_float", default=1.0,
help="input audio gain multiplier")
parser.add_option("-n", "--nchannels", type="int", default=2,
help="number of Tx channels [1,4]")
parser.add_option("-a", "--udp-addr", type="string", default="127.0.0.1",
help="UDP host IP address")
parser.add_option("-p", "--udp-port", type="int", default=0,
help="UDP port number")
parser.add_option("-r","--repeat", action="store_true", default=False,
help="continuously replay input file")
parser.add_option("-S", "--stretch", type="int", default=0,
help="elastic buffer trigger value")
parser.add_option("-v","--verbose", action="store_true", default=False,
help="print out stats")
parser.add_option("-I", "--audio-input", type="string", default="", help="pcm input device name. E.g., hw:0,0 or /dev/dsp")
(options, args) = parser.parse_args ()
if len(args) != 0:
parser.print_help()
sys.exit(1)
if options.nchannels < 1 or options.nchannels > MAX_CHANNELS:
sys.stderr.write ("op25_tx: nchannels out of range. Must be in [1,%d]\n" % MAX_CHANNELS)
sys.exit(1)
if options.freq is None:
sys.stderr.write("op25_tx: must specify frequency with -f FREQ\n")
parser.print_help()
sys.exit(1)
# ----------------------------------------------------------------
# Set up constants and parameters
self.u = usrp.sink_c () # the USRP sink (consumes samples)
self.dac_rate = self.u.dac_rate() # 128 MS/s
self.usrp_interp = 400
self.u.set_interp_rate(self.usrp_interp)
self.usrp_rate = self.dac_rate / self.usrp_interp # 320 kS/s
self.sw_interp = 10
self.audio_rate = self.usrp_rate / self.sw_interp # 32 kS/s
# determine the daughterboard subdevice we're using
if options.tx_subdev_spec is None:
options.tx_subdev_spec = usrp.pick_tx_subdevice(self.u)
m = usrp.determine_tx_mux_value(self.u, options.tx_subdev_spec)
#print "mux = %#04x" % (m,)
self.u.set_mux(m)
self.subdev = usrp.selected_subdev(self.u, options.tx_subdev_spec)
print "Using TX d'board %s" % (self.subdev.side_and_name(),)
self.subdev.set_gain(self.subdev.gain_range()[0]) # set min Tx gain
if not self.set_freq(options.freq):
freq_range = self.subdev.freq_range()
print "Failed to set frequency to %s. Daughterboard supports %s to %s" % (
eng_notation.num_to_str(options.freq),
eng_notation.num_to_str(freq_range[0]),
eng_notation.num_to_str(freq_range[1]))
raise SystemExit
self.subdev.set_enable(True) # enable transmitter
# instantiate vocoders
self.vocoders = []
if options.file_input:
for i in range (options.nchannels):
t = gr.file_source(gr.sizeof_char, "baseband-%d.dat" % i, options.repeat)
self.vocoders.append(t)
elif options.udp_port > 0:
self.udp_sources = []
for i in range (options.nchannels):
t = gr.udp_source(1, options.udp_addr, options.udp_port + i, 216)
self.udp_sources.append(t)
arity = 2
t = gr.packed_to_unpacked_bb(arity, gr.GR_MSB_FIRST)
self.vocoders.append(t)
self.connect(self.udp_sources[i], self.vocoders[i])
else:
self.audio_amps = []
self.converters = []
input_audio_rate = 8000
self.audio_input = audio.source(input_audio_rate, options.audio_input)
for i in range (options.nchannels):
t = gr.multiply_const_ff(32767 * options.audio_gain)
self.audio_amps.append(t)
t = gr.float_to_short()
self.converters.append(t)
t = repeater.vocoder(True, # 0=Decode,True=Encode
options.verbose, # Verbose flag
options.stretch, # flex amount
"", # udp ip address
0, # udp port
False) # dump raw u vectors
self.vocoders.append(t)
self.connect((self.audio_input, i), self.audio_amps[i], self.converters[i], self.vocoders[i])
sum = gr.add_cc ()
# Instantiate N NBFM channels
step = 25e3
offset = (0 * step, 1 * step, -1 * step, 2 * step, -2 * step, 3 * step, -3 * step)
for i in range (options.nchannels):
t = pipeline(self.vocoders[i], offset[i],
self.audio_rate, self.usrp_rate)
self.connect(t, (sum, i))
gain = gr.multiply_const_cc (4000.0 / options.nchannels)
# connect it all
self.connect (sum, gain)
self.connect (gain, self.u)
# plot an FFT to verify we are sending what we want
if options.enable_fft:
post_mod = fftsink2.fft_sink_c(panel, title="Post Modulation",
fft_size=512, sample_rate=self.usrp_rate,
y_per_div=20, ref_level=40)
self.connect (sum, post_mod)
vbox.Add (post_mod.win, 1, wx.EXPAND)
#if options.debug:
# self.debugger = tx_debug_gui.tx_debug_gui(self.subdev)
# self.debugger.Show(True)
def set_freq(self, target_freq):
"""
Set the center frequency we're interested in.
@param target_freq: frequency in Hz
@rypte: bool
Tuning is a two step process. First we ask the front-end to
tune as close to the desired frequency as it can. Then we use
the result of that operation and our target_frequency to
determine the value for the digital up converter. Finally, we feed
any residual_freq to the s/w freq translater.
"""
r = self.u.tune(self.subdev.which(), self.subdev, target_freq)
if r:
print "r.baseband_freq =", eng_notation.num_to_str(r.baseband_freq)
print "r.dxc_freq =", eng_notation.num_to_str(r.dxc_freq)
print "r.residual_freq =", eng_notation.num_to_str(r.residual_freq)
print "r.inverted =", r.inverted
# Could use residual_freq in s/w freq translator
return True
return False
def main ():
sys.stderr.write("GNU Radio Multichannel APCO P25 Tx (c) Copyright 2009, KA1RBI\n")
app = stdgui2.stdapp(fm_tx_block, "Multichannel APCO P25 Tx", nstatus=1)
app.MainLoop ()
if __name__ == '__main__':
main ()