From c0bbdc9c70869a32a9421c41b9cd0058bcf5dc1f Mon Sep 17 00:00:00 2001 From: Max Date: Tue, 3 Apr 2018 14:40:11 -0400 Subject: [PATCH] http server updates --- op25/gr-op25_repeater/apps/http.py | 95 +++++++++++++++++++++++++----- 1 file changed, 81 insertions(+), 14 deletions(-) diff --git a/op25/gr-op25_repeater/apps/http.py b/op25/gr-op25_repeater/apps/http.py index 2af7e96..38ecba4 100755 --- a/op25/gr-op25_repeater/apps/http.py +++ b/op25/gr-op25_repeater/apps/http.py @@ -1,4 +1,4 @@ -#! /usr/bin/env python +#! /usr/bin/python # Copyright 2017, 2018 Max H. Parke KA1RBI # @@ -28,12 +28,13 @@ import socket import traceback import threading import glob +import subprocess +import zmq from gnuradio import gr from waitress.server import create_server from optparse import OptionParser from multi_rx import byteify -from rx import p25_rx_block my_input_q = None my_output_q = None @@ -181,12 +182,9 @@ def process_qmsg(msg): class http_server(object): def __init__(self, input_q, output_q, endpoint, **kwds): global my_input_q, my_output_q, my_recv_q, my_port - if endpoint == 'internal': - return - else: - host, port = endpoint.split(':') - if my_port is not None: - raise AssertionError('this server is already active on port %s' % my_port) + host, port = endpoint.split(':') + if my_port is not None: + raise AssertionError('this server is already active on port %s' % my_port) my_input_q = input_q my_output_q = output_q my_port = int(port) @@ -222,15 +220,85 @@ class Backend(threading.Thread): self.input_q = input_q self.output_q = output_q self.verbosity = options.verbosity + + self.zmq_context = zmq.Context() + self.zmq_port = options.zmq_port + + self.zmq_sub = self.zmq_context.socket(zmq.SUB) + self.zmq_sub.connect('tcp://localhost:%d' % self.zmq_port) + self.zmq_sub.setsockopt(zmq.SUBSCRIBE, '') + + self.zmq_pub = self.zmq_context.socket(zmq.PUB) + self.zmq_pub.sndhwm = 5 + self.zmq_pub.bind('tcp://*:%d' % (self.zmq_port+1)) + self.start() + self.subproc = None + self.backend = '%s/%s' % (os.getcwd(), 'rx.py') + + def check_subproc(self): # return True if subprocess is active + if not self.subproc: + return False + rc = self.subproc.poll() + if rc is None: + return True + else: + self.subproc.wait() + self.subproc = None + return False def process_msg(self, msg): + def make_command(options): + types = {'costas-alpha': 'float', 'trunk-conf-file': 'str', 'demod-type': 'str', 'logfile-workers': 'int', 'decim-amt': 'int', 'wireshark-host': 'str', 'gain-mu': 'float', 'phase2-tdma': 'bool', 'seek': 'int', 'ifile': 'str', 'pause': 'bool', 'antenna': 'str', 'calibration': 'float', 'fine-tune': 'float', 'raw-symbols': 'str', 'audio-output': 'str', 'vocoder': 'bool', 'input': 'str', 'wireshark': 'bool', 'gains': 'str', 'args': 'str', 'sample-rate': 'int', 'terminal-type': 'str', 'gain': 'float', 'excess-bw': 'float', 'offset': 'float', 'audio-input': 'str', 'audio': 'bool', 'plot-mode': 'str', 'audio-if': 'bool', 'tone-detect': 'bool', 'frequency': 'int', 'freq-corr': 'float', 'hamlib-model': 'int', 'udp-player': 'bool', 'verbosity': 'int'} + opts = [self.backend] + for k in [ x for x in dir(options) if not x.startswith('_') ]: + kw = k.replace('_', '-') + val = getattr(options, k) + if kw not in types.keys(): + print 'make_command: unknown option: %s %s type %s' % (k, val, type(val)) + return None + elif types[kw] == 'str': + if val: + opts.append('--%s' % kw) + opts.append('%s' % (val)) + elif types[kw] == 'float': + opts.append('--%s' % kw) + if val: + opts.append('%f' % (val)) + else: + opts.append('%f' % (0)) + elif types[kw] == 'int': + opts.append('--%s' % kw) + if val: + opts.append('%d' % (val)) + else: + opts.append('%d' % (0)) + elif types[kw] == 'bool': + if val: + opts.append('--%s' % kw) + else: + print 'make_command: unknown2 option: %s %s type %s' % (k, val, type(val)) + return None + return opts + msg = json.loads(msg.to_string()) if msg['command'] == 'rx-start': + if self.check_subproc(): + sys.stderr.write('command failed: subprocess pid %d already active\n' % self.subproc.pid) + return options = rx_options(msg['data']) options.verbosity = self.verbosity - options._js_config['config-rx-data'] = {'input_q': self.input_q, 'output_q': self.output_q} - self.tb = p25_rx_block(options) + options.terminal_type = 'zmq:tcp:%d' % (self.zmq_port) + cmd = make_command(options) + self.subproc = subprocess.Popen(cmd) + elif msg['command'] == 'rx-stop': + if not self.check_subproc(): + sys.stderr.write('command failed: subprocess not active\n') + return + if msg['data'] == 'kill': + self.subproc.kill() + else: + self.subproc.terminate() def run(self): while self.keep_running: @@ -257,14 +325,12 @@ class rx_options(object): setattr(self, map_name(k), config['backend-rx'][k]) for k in 'args frequency gains offset'.split(): setattr(self, k, dev[k]) - for k in 'demod_type filter_type'.split(): - setattr(self, k, chan[k]) + self.demod_type = chan['demod_type'] self.freq_corr = dev['ppm'] self.sample_rate = dev['rate'] self.plot_mode = chan['plot'] self.phase2_tdma = chan['phase2_tdma'] - self.trunk_conf_file = None - self.terminal_type = None + self.trunk_conf_file = "" self._js_config = config def http_main(): @@ -275,6 +341,7 @@ def http_main(): parser.add_option("-e", "--endpoint", type="string", default="127.0.0.1:8080", help="address:port to listen on (use addr 0.0.0.0 to enable external clients)") parser.add_option("-v", "--verbosity", type="int", default=0, help="message debug level") parser.add_option("-p", "--pause", action="store_true", default=False, help="block on startup") + parser.add_option("-z", "--zmq-port", type="int", default=25000, help="backend sub port") (options, args) = parser.parse_args() # wait for gdb