diff --git a/op25/gr-op25_repeater/apps/http.py b/op25/gr-op25_repeater/apps/http.py index afce204..4a66596 100644 --- a/op25/gr-op25_repeater/apps/http.py +++ b/op25/gr-op25_repeater/apps/http.py @@ -25,12 +25,14 @@ import re import json import socket import traceback +import threading from gnuradio import gr from waitress.server import create_server my_input_q = None my_output_q = None +my_recv_q = None my_port = None """ @@ -62,7 +64,7 @@ def static_file(environ, start_response): return status, content_type, output def post_req(environ, start_response, postdata): - global my_input_q, my_output_q, my_port + global my_input_q, my_output_q, my_recv_q, my_port try: data = json.loads(postdata) msg = gr.message().make_from_string(str(data['command']), -2, data['data'], 0) @@ -74,8 +76,8 @@ def post_req(environ, start_response, postdata): sys.stderr.write('*** end traceback ***\n') resp_msg = [] - while not my_input_q.empty_p(): - msg = my_input_q.delete_head() + while not my_recv_q.empty_p(): + msg = my_recv_q.delete_head() if msg.type() == -4: resp_msg.append(json.loads(msg.to_string())) status = '200 OK' @@ -111,9 +113,16 @@ def application(environ, start_response): sys.exit(1) return result +def process_qmsg(msg): + if my_recv_q.full_p(): + my_recv_q.delete_head_nowait() # ignores result + if my_recv_q.full_p(): + return + my_recv_q.insert_tail(msg) + class http_server(object): def __init__(self, input_q, output_q, endpoint, **kwds): - global my_input_q, my_output_q, my_port + global my_input_q, my_output_q, my_recv_q, my_port host, port = endpoint.split(':') if my_port is not None: raise AssertionError('this server is already active on port %s' % my_port) @@ -121,7 +130,24 @@ class http_server(object): my_output_q = output_q my_port = int(port) + my_recv_q = gr.msg_queue(10) + self.q_watcher = queue_watcher(my_input_q, process_qmsg) + self.server = create_server(application, host=host, port=my_port) def run(self): self.server.run() + +class queue_watcher(threading.Thread): + def __init__(self, msgq, callback, **kwds): + threading.Thread.__init__ (self, **kwds) + self.setDaemon(1) + self.msgq = msgq + self.callback = callback + self.keep_running = True + self.start() + + def run(self): + while(self.keep_running): + msg = self.msgq.delete_head() + self.callback(msg)