|
|
|
@ -207,6 +207,53 @@ class curses_terminal(threading.Thread): |
|
|
|
|
self.keep_running = False |
|
|
|
|
self.send_command('quit', 0) |
|
|
|
|
|
|
|
|
|
class zeromq_terminal(threading.Thread): |
|
|
|
|
def __init__(self, input_q, output_q, endpoint, **kwds): |
|
|
|
|
import zmq |
|
|
|
|
threading.Thread.__init__ (self, **kwds) |
|
|
|
|
self.setDaemon(1) |
|
|
|
|
self.input_q = input_q |
|
|
|
|
self.output_q = output_q |
|
|
|
|
self.endpoint = endpoint |
|
|
|
|
self.keep_running = True |
|
|
|
|
|
|
|
|
|
if not endpoint.startswith('tcp:'): |
|
|
|
|
sys.stderr.write('zeromq_terminal unsupported endpoint: %s\n' % endpoint) |
|
|
|
|
return |
|
|
|
|
port = endpoint.replace('tcp:', '') |
|
|
|
|
port = int(port) |
|
|
|
|
|
|
|
|
|
self.zmq_context = zmq.Context() |
|
|
|
|
|
|
|
|
|
self.zmq_sub = self.zmq_context.socket(zmq.SUB) |
|
|
|
|
self.zmq_sub.connect('tcp://localhost:%d' % (port+1)) |
|
|
|
|
self.zmq_sub.setsockopt(zmq.SUBSCRIBE, '') |
|
|
|
|
|
|
|
|
|
self.zmq_pub = self.zmq_context.socket(zmq.PUB) |
|
|
|
|
self.zmq_pub.sndhwm = 5 |
|
|
|
|
self.zmq_pub.bind('tcp://*:%d' % port) |
|
|
|
|
|
|
|
|
|
self.queue_watcher = q_watcher(self.input_q, self.process_qmsg) |
|
|
|
|
self.start() |
|
|
|
|
|
|
|
|
|
def end_terminal(self): |
|
|
|
|
self.keep_running = False |
|
|
|
|
|
|
|
|
|
def process_qmsg(self, msg): |
|
|
|
|
msg = self.input_q.delete_head() |
|
|
|
|
self.zmq_pub.send(msg.to_string()) |
|
|
|
|
|
|
|
|
|
def run(self): |
|
|
|
|
while self.keep_running: |
|
|
|
|
js = self.zmq_sub.recv() |
|
|
|
|
if not self.keep_running: |
|
|
|
|
break |
|
|
|
|
msg = gr.message().make_from_string(js, -4, 0, 0) |
|
|
|
|
if self.output_q.full_p(): |
|
|
|
|
self.output_q.delete_head() |
|
|
|
|
if not self.output_q.full_p(): |
|
|
|
|
self.output_q.insert_tail(msg) |
|
|
|
|
|
|
|
|
|
class http_terminal(threading.Thread): |
|
|
|
|
def __init__(self, input_q, output_q, endpoint, **kwds): |
|
|
|
|
from http import http_server |
|
|
|
@ -273,6 +320,8 @@ class udp_terminal(threading.Thread): |
|
|
|
|
def op25_terminal(input_q, output_q, terminal_type): |
|
|
|
|
if terminal_type == 'curses': |
|
|
|
|
return curses_terminal(input_q, output_q) |
|
|
|
|
elif terminal_type.startswith('zmq:'): |
|
|
|
|
return zeromq_terminal(input_q, output_q, terminal_type.replace('zmq:', '')) |
|
|
|
|
elif terminal_type[0].isdigit(): |
|
|
|
|
port = int(terminal_type) |
|
|
|
|
return udp_terminal(input_q, output_q, port) |
|
|
|
|