http bugfix thx wa8wg for the report
parent
7bdfe78aa0
commit
6b45a95754
|
@ -25,12 +25,14 @@ import re
|
|||
import json
|
||||
import socket
|
||||
import traceback
|
||||
import threading
|
||||
|
||||
from gnuradio import gr
|
||||
from waitress.server import create_server
|
||||
|
||||
my_input_q = None
|
||||
my_output_q = None
|
||||
my_recv_q = None
|
||||
my_port = None
|
||||
|
||||
"""
|
||||
|
@ -62,7 +64,7 @@ def static_file(environ, start_response):
|
|||
return status, content_type, output
|
||||
|
||||
def post_req(environ, start_response, postdata):
|
||||
global my_input_q, my_output_q, my_port
|
||||
global my_input_q, my_output_q, my_recv_q, my_port
|
||||
try:
|
||||
data = json.loads(postdata)
|
||||
msg = gr.message().make_from_string(str(data['command']), -2, data['data'], 0)
|
||||
|
@ -74,8 +76,8 @@ def post_req(environ, start_response, postdata):
|
|||
sys.stderr.write('*** end traceback ***\n')
|
||||
|
||||
resp_msg = []
|
||||
while not my_input_q.empty_p():
|
||||
msg = my_input_q.delete_head()
|
||||
while not my_recv_q.empty_p():
|
||||
msg = my_recv_q.delete_head()
|
||||
if msg.type() == -4:
|
||||
resp_msg.append(json.loads(msg.to_string()))
|
||||
status = '200 OK'
|
||||
|
@ -111,9 +113,16 @@ def application(environ, start_response):
|
|||
sys.exit(1)
|
||||
return result
|
||||
|
||||
def process_qmsg(msg):
|
||||
if my_recv_q.full_p():
|
||||
my_recv_q.delete_head_nowait() # ignores result
|
||||
if my_recv_q.full_p():
|
||||
return
|
||||
my_recv_q.insert_tail(msg)
|
||||
|
||||
class http_server(object):
|
||||
def __init__(self, input_q, output_q, endpoint, **kwds):
|
||||
global my_input_q, my_output_q, my_port
|
||||
global my_input_q, my_output_q, my_recv_q, my_port
|
||||
host, port = endpoint.split(':')
|
||||
if my_port is not None:
|
||||
raise AssertionError('this server is already active on port %s' % my_port)
|
||||
|
@ -121,7 +130,24 @@ class http_server(object):
|
|||
my_output_q = output_q
|
||||
my_port = int(port)
|
||||
|
||||
my_recv_q = gr.msg_queue(10)
|
||||
self.q_watcher = queue_watcher(my_input_q, process_qmsg)
|
||||
|
||||
self.server = create_server(application, host=host, port=my_port)
|
||||
|
||||
def run(self):
|
||||
self.server.run()
|
||||
|
||||
class queue_watcher(threading.Thread):
|
||||
def __init__(self, msgq, callback, **kwds):
|
||||
threading.Thread.__init__ (self, **kwds)
|
||||
self.setDaemon(1)
|
||||
self.msgq = msgq
|
||||
self.callback = callback
|
||||
self.keep_running = True
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
while(self.keep_running):
|
||||
msg = self.msgq.delete_head()
|
||||
self.callback(msg)
|
||||
|
|
Loading…
Reference in New Issue