DbgMuxPeer: spawn two threads for handling Rx/Tx messages

Vadim Yanitskiy 8 months ago
parent 138af0da7a
commit 13d7a437ca
  1. 48
      peer.py
  2. 2
      sedbgmux.py

@ -17,6 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
import queue
from typing import Any
from construct import Const, Container, Int16ul
@ -31,7 +33,49 @@ class DbgMuxPeer:
self.rx_count: int = 0
self.io = io
# Threads handling Rx/Tx frames
self._rx_thread = threading.Thread(target=self._rx_worker,
name='DbgMuxPeer-Rx',
daemon=True)
self._tx_thread = threading.Thread(target=self._tx_worker,
name='DbgMuxPeer-Tx',
daemon=True)
self._shutdown = threading.Event()
# Internal queues for Rx/Tx frames
self._rx_queue = queue.Queue()
self._tx_queue = queue.Queue()
def start(self) -> None:
self._shutdown.clear()
self._rx_thread.start()
self._tx_thread.start()
def stop(self) -> None:
# Wait for all Tx messages to be processed
self._tx_queue.join()
# Set the shutdown event
self._shutdown.set()
def _rx_worker(self) -> None:
while not self._shutdown.is_set():
frame = self._recv() # blocking
self._rx_queue.put(frame)
log.debug('Thread \'%s\' is shutting down', threading.currentThread().name)
def _tx_worker(self) -> None:
while not self._shutdown.is_set():
(msg_type, msg) = self._tx_queue.get(block=True)
self._send(msg_type, msg)
self._tx_queue.task_done()
log.debug('Thread \'%s\' is shutting down', threading.currentThread().name)
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
''' Send a single message (non-blocking call) '''
self._tx_queue.put((msg_type, msg))
def _send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
# Encode the inner message first
msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type)
@ -64,6 +108,10 @@ class DbgMuxPeer:
self.tx_count += 1
def recv(self) -> Container:
''' Receive a single message (blocking call) '''
return self._rx_queue.get(block=True)
def _recv(self) -> Container:
frame: bytes = b''
frame += self.io.read(2) # Magic
Const(b'\x42\x42').parse(frame[:2])

@ -63,11 +63,13 @@ class SEDbgMuxApp(cmd2.Cmd):
def do_connect(self, opts) -> None:
''' Connect to the modem and switch it to DebugMux mode '''
self.transport.connect()
self.peer.start()
self.set_connected(True)
@cmd2.with_category(CATEGORY_CONN)
def do_disconnect(self, opts) -> None:
''' Disconnect from the modem '''
self.peer.stop()
self.transport.disconnect()
self.set_connected(False)

Loading…
Cancel
Save