From 05c71259f60dea6f49b846285c5935de206c8a51 Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Mon, 28 Mar 2022 02:43:55 +0300 Subject: [PATCH] DbgMuxPeer: spawn two threads for handling Rx/Tx messages --- peer.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++- sedbgmux.py | 2 ++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/peer.py b/peer.py index f6b0606..ae17614 100644 --- a/peer.py +++ b/peer.py @@ -17,6 +17,8 @@ # along with this program. If not, see . import logging as log +import threading +import queue from typing import Any, Optional from construct import Const, Container, Int16ul @@ -31,7 +33,54 @@ class DbgMuxPeer: self.rx_count: int = 0 self.io = io + # Threads handling Rx/Tx frames + self._rx_thread = threading.Thread(target=self._rx_worker, + name='DbgMuxPeer-Rx', + daemon=True) + self._tx_thread = threading.Thread(target=self._tx_worker, + name='DbgMuxPeer-Tx', + daemon=True) + + self._shutdown = threading.Event() + + # Internal queues for Rx/Tx frames + self._rx_queue = queue.Queue() + self._tx_queue = queue.Queue() + + def start(self) -> None: + self._shutdown.clear() + self._rx_thread.start() + self._tx_thread.start() + + def stop(self) -> None: + # Set the shutdown event + self._shutdown.set() + # Wait for both threads to terminate + self._tx_thread.join() + self._rx_thread.join() + + def _rx_worker(self) -> None: + while not self._shutdown.is_set(): + frame = self._recv() # blocking until timeout + if frame is not None: + self._rx_queue.put(frame) + log.debug('Thread \'%s\' is shutting down', threading.current_thread().name) + + def _tx_worker(self) -> None: + while not self._shutdown.is_set(): + try: + (msg_type, msg) = self._tx_queue.get(block=True, timeout=0.5) + self._send(msg_type, msg) + self._tx_queue.task_done() + except queue.Empty: + pass + log.debug('Thread \'%s\' is shutting down', threading.current_thread().name) + def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None: + ''' Send a single message (non-blocking call) ''' + self._tx_queue.put((msg_type, msg)) + + def _send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None: # Encode the inner message first msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type) @@ -63,7 +112,11 @@ class DbgMuxPeer: if msg_type != DbgMuxFrame.MsgType.Ack: self.tx_count += 1 - def recv(self) -> Optional[Container]: + def recv(self, timeout: Optional[float] = None) -> Container: + ''' Receive a single message (blocking call) ''' + return self._rx_queue.get(block=True, timeout=timeout) + + def _recv(self) -> Optional[Container]: frame: bytes = b'' frame += self.io.read(2) # Magic if frame == b'': diff --git a/sedbgmux.py b/sedbgmux.py index 675e35b..1badd46 100755 --- a/sedbgmux.py +++ b/sedbgmux.py @@ -63,11 +63,13 @@ class SEDbgMuxApp(cmd2.Cmd): def do_connect(self, opts) -> None: ''' Connect to the modem and switch it to DebugMux mode ''' self.transport.connect() + self.peer.start() self.set_connected(True) @cmd2.with_category(CATEGORY_CONN) def do_disconnect(self, opts) -> None: ''' Disconnect from the modem ''' + self.peer.stop() self.transport.disconnect() self.set_connected(False)