DbgMuxPeer: spawn two threads for handling Rx/Tx messages

This commit is contained in:
Vadim Yanitskiy 2022-03-28 02:43:55 +03:00 committed by Vadim Yanitskiy
parent 2990cc14b8
commit 05c71259f6
2 changed files with 56 additions and 1 deletions

55
peer.py
View File

@ -17,6 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log import logging as log
import threading
import queue
from typing import Any, Optional from typing import Any, Optional
from construct import Const, Container, Int16ul from construct import Const, Container, Int16ul
@ -31,7 +33,54 @@ class DbgMuxPeer:
self.rx_count: int = 0 self.rx_count: int = 0
self.io = io self.io = io
# Threads handling Rx/Tx frames
self._rx_thread = threading.Thread(target=self._rx_worker,
name='DbgMuxPeer-Rx',
daemon=True)
self._tx_thread = threading.Thread(target=self._tx_worker,
name='DbgMuxPeer-Tx',
daemon=True)
self._shutdown = threading.Event()
# Internal queues for Rx/Tx frames
self._rx_queue = queue.Queue()
self._tx_queue = queue.Queue()
def start(self) -> None:
self._shutdown.clear()
self._rx_thread.start()
self._tx_thread.start()
def stop(self) -> None:
# Set the shutdown event
self._shutdown.set()
# Wait for both threads to terminate
self._tx_thread.join()
self._rx_thread.join()
def _rx_worker(self) -> None:
while not self._shutdown.is_set():
frame = self._recv() # blocking until timeout
if frame is not None:
self._rx_queue.put(frame)
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def _tx_worker(self) -> None:
while not self._shutdown.is_set():
try:
(msg_type, msg) = self._tx_queue.get(block=True, timeout=0.5)
self._send(msg_type, msg)
self._tx_queue.task_done()
except queue.Empty:
pass
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None: def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
''' Send a single message (non-blocking call) '''
self._tx_queue.put((msg_type, msg))
def _send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
# Encode the inner message first # Encode the inner message first
msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type) msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type)
@ -63,7 +112,11 @@ class DbgMuxPeer:
if msg_type != DbgMuxFrame.MsgType.Ack: if msg_type != DbgMuxFrame.MsgType.Ack:
self.tx_count += 1 self.tx_count += 1
def recv(self) -> Optional[Container]: def recv(self, timeout: Optional[float] = None) -> Container:
''' Receive a single message (blocking call) '''
return self._rx_queue.get(block=True, timeout=timeout)
def _recv(self) -> Optional[Container]:
frame: bytes = b'' frame: bytes = b''
frame += self.io.read(2) # Magic frame += self.io.read(2) # Magic
if frame == b'': if frame == b'':

View File

@ -63,11 +63,13 @@ class SEDbgMuxApp(cmd2.Cmd):
def do_connect(self, opts) -> None: def do_connect(self, opts) -> None:
''' Connect to the modem and switch it to DebugMux mode ''' ''' Connect to the modem and switch it to DebugMux mode '''
self.transport.connect() self.transport.connect()
self.peer.start()
self.set_connected(True) self.set_connected(True)
@cmd2.with_category(CATEGORY_CONN) @cmd2.with_category(CATEGORY_CONN)
def do_disconnect(self, opts) -> None: def do_disconnect(self, opts) -> None:
''' Disconnect from the modem ''' ''' Disconnect from the modem '''
self.peer.stop()
self.transport.disconnect() self.transport.disconnect()
self.set_connected(False) self.set_connected(False)