http server updates

This commit is contained in:
Max 2018-04-03 14:40:11 -04:00
parent 85d09681fe
commit c0bbdc9c70
1 changed files with 81 additions and 14 deletions

View File

@ -1,4 +1,4 @@
#! /usr/bin/env python
#! /usr/bin/python
# Copyright 2017, 2018 Max H. Parke KA1RBI
#
@ -28,12 +28,13 @@ import socket
import traceback
import threading
import glob
import subprocess
import zmq
from gnuradio import gr
from waitress.server import create_server
from optparse import OptionParser
from multi_rx import byteify
from rx import p25_rx_block
my_input_q = None
my_output_q = None
@ -181,12 +182,9 @@ def process_qmsg(msg):
class http_server(object):
def __init__(self, input_q, output_q, endpoint, **kwds):
global my_input_q, my_output_q, my_recv_q, my_port
if endpoint == 'internal':
return
else:
host, port = endpoint.split(':')
if my_port is not None:
raise AssertionError('this server is already active on port %s' % my_port)
host, port = endpoint.split(':')
if my_port is not None:
raise AssertionError('this server is already active on port %s' % my_port)
my_input_q = input_q
my_output_q = output_q
my_port = int(port)
@ -222,15 +220,85 @@ class Backend(threading.Thread):
self.input_q = input_q
self.output_q = output_q
self.verbosity = options.verbosity
self.zmq_context = zmq.Context()
self.zmq_port = options.zmq_port
self.zmq_sub = self.zmq_context.socket(zmq.SUB)
self.zmq_sub.connect('tcp://localhost:%d' % self.zmq_port)
self.zmq_sub.setsockopt(zmq.SUBSCRIBE, '')
self.zmq_pub = self.zmq_context.socket(zmq.PUB)
self.zmq_pub.sndhwm = 5
self.zmq_pub.bind('tcp://*:%d' % (self.zmq_port+1))
self.start()
self.subproc = None
self.backend = '%s/%s' % (os.getcwd(), 'rx.py')
def check_subproc(self): # return True if subprocess is active
if not self.subproc:
return False
rc = self.subproc.poll()
if rc is None:
return True
else:
self.subproc.wait()
self.subproc = None
return False
def process_msg(self, msg):
def make_command(options):
types = {'costas-alpha': 'float', 'trunk-conf-file': 'str', 'demod-type': 'str', 'logfile-workers': 'int', 'decim-amt': 'int', 'wireshark-host': 'str', 'gain-mu': 'float', 'phase2-tdma': 'bool', 'seek': 'int', 'ifile': 'str', 'pause': 'bool', 'antenna': 'str', 'calibration': 'float', 'fine-tune': 'float', 'raw-symbols': 'str', 'audio-output': 'str', 'vocoder': 'bool', 'input': 'str', 'wireshark': 'bool', 'gains': 'str', 'args': 'str', 'sample-rate': 'int', 'terminal-type': 'str', 'gain': 'float', 'excess-bw': 'float', 'offset': 'float', 'audio-input': 'str', 'audio': 'bool', 'plot-mode': 'str', 'audio-if': 'bool', 'tone-detect': 'bool', 'frequency': 'int', 'freq-corr': 'float', 'hamlib-model': 'int', 'udp-player': 'bool', 'verbosity': 'int'}
opts = [self.backend]
for k in [ x for x in dir(options) if not x.startswith('_') ]:
kw = k.replace('_', '-')
val = getattr(options, k)
if kw not in types.keys():
print 'make_command: unknown option: %s %s type %s' % (k, val, type(val))
return None
elif types[kw] == 'str':
if val:
opts.append('--%s' % kw)
opts.append('%s' % (val))
elif types[kw] == 'float':
opts.append('--%s' % kw)
if val:
opts.append('%f' % (val))
else:
opts.append('%f' % (0))
elif types[kw] == 'int':
opts.append('--%s' % kw)
if val:
opts.append('%d' % (val))
else:
opts.append('%d' % (0))
elif types[kw] == 'bool':
if val:
opts.append('--%s' % kw)
else:
print 'make_command: unknown2 option: %s %s type %s' % (k, val, type(val))
return None
return opts
msg = json.loads(msg.to_string())
if msg['command'] == 'rx-start':
if self.check_subproc():
sys.stderr.write('command failed: subprocess pid %d already active\n' % self.subproc.pid)
return
options = rx_options(msg['data'])
options.verbosity = self.verbosity
options._js_config['config-rx-data'] = {'input_q': self.input_q, 'output_q': self.output_q}
self.tb = p25_rx_block(options)
options.terminal_type = 'zmq:tcp:%d' % (self.zmq_port)
cmd = make_command(options)
self.subproc = subprocess.Popen(cmd)
elif msg['command'] == 'rx-stop':
if not self.check_subproc():
sys.stderr.write('command failed: subprocess not active\n')
return
if msg['data'] == 'kill':
self.subproc.kill()
else:
self.subproc.terminate()
def run(self):
while self.keep_running:
@ -257,14 +325,12 @@ class rx_options(object):
setattr(self, map_name(k), config['backend-rx'][k])
for k in 'args frequency gains offset'.split():
setattr(self, k, dev[k])
for k in 'demod_type filter_type'.split():
setattr(self, k, chan[k])
self.demod_type = chan['demod_type']
self.freq_corr = dev['ppm']
self.sample_rate = dev['rate']
self.plot_mode = chan['plot']
self.phase2_tdma = chan['phase2_tdma']
self.trunk_conf_file = None
self.terminal_type = None
self.trunk_conf_file = ""
self._js_config = config
def http_main():
@ -275,6 +341,7 @@ def http_main():
parser.add_option("-e", "--endpoint", type="string", default="127.0.0.1:8080", help="address:port to listen on (use addr 0.0.0.0 to enable external clients)")
parser.add_option("-v", "--verbosity", type="int", default=0, help="message debug level")
parser.add_option("-p", "--pause", action="store_true", default=False, help="block on startup")
parser.add_option("-z", "--zmq-port", type="int", default=25000, help="backend sub port")
(options, args) = parser.parse_args()
# wait for gdb