From e0eabf10f047f70e017510ed9b2ee0b8a808f161 Mon Sep 17 00:00:00 2001 From: Max Date: Tue, 3 Apr 2018 15:18:58 -0400 Subject: [PATCH] http server updates --- op25/gr-op25_repeater/apps/terminal.py | 49 ++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/op25/gr-op25_repeater/apps/terminal.py b/op25/gr-op25_repeater/apps/terminal.py index 84f7ac3..cbc0114 100755 --- a/op25/gr-op25_repeater/apps/terminal.py +++ b/op25/gr-op25_repeater/apps/terminal.py @@ -207,6 +207,53 @@ class curses_terminal(threading.Thread): self.keep_running = False self.send_command('quit', 0) +class zeromq_terminal(threading.Thread): + def __init__(self, input_q, output_q, endpoint, **kwds): + import zmq + threading.Thread.__init__ (self, **kwds) + self.setDaemon(1) + self.input_q = input_q + self.output_q = output_q + self.endpoint = endpoint + self.keep_running = True + + if not endpoint.startswith('tcp:'): + sys.stderr.write('zeromq_terminal unsupported endpoint: %s\n' % endpoint) + return + port = endpoint.replace('tcp:', '') + port = int(port) + + self.zmq_context = zmq.Context() + + self.zmq_sub = self.zmq_context.socket(zmq.SUB) + self.zmq_sub.connect('tcp://localhost:%d' % (port+1)) + self.zmq_sub.setsockopt(zmq.SUBSCRIBE, '') + + self.zmq_pub = self.zmq_context.socket(zmq.PUB) + self.zmq_pub.sndhwm = 5 + self.zmq_pub.bind('tcp://*:%d' % port) + + self.queue_watcher = q_watcher(self.input_q, self.process_qmsg) + self.start() + + def end_terminal(self): + self.keep_running = False + + def process_qmsg(self, msg): + msg = self.input_q.delete_head() + self.zmq_pub.send(msg.to_string()) + + def run(self): + while self.keep_running: + js = self.zmq_sub.recv() + if not self.keep_running: + break + msg = gr.message().make_from_string(js, -4, 0, 0) + if self.output_q.full_p(): + self.output_q.delete_head() + if not self.output_q.full_p(): + self.output_q.insert_tail(msg) + class http_terminal(threading.Thread): def __init__(self, input_q, output_q, endpoint, **kwds): from http import http_server @@ -273,6 +320,8 @@ class udp_terminal(threading.Thread): def op25_terminal(input_q, output_q, terminal_type): if terminal_type == 'curses': return curses_terminal(input_q, output_q) + elif terminal_type.startswith('zmq:'): + return zeromq_terminal(input_q, output_q, terminal_type.replace('zmq:', '')) elif terminal_type[0].isdigit(): port = int(terminal_type) return udp_terminal(input_q, output_q, port)