|
|
|
@ -17,6 +17,8 @@ |
|
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
|
|
|
|
|
|
|
|
import logging as log |
|
|
|
|
import threading |
|
|
|
|
import queue |
|
|
|
|
|
|
|
|
|
from typing import Any |
|
|
|
|
from construct import Const, Container, Int16ul |
|
|
|
@ -31,7 +33,49 @@ class DbgMuxPeer: |
|
|
|
|
self.rx_count: int = 0 |
|
|
|
|
self.io = io |
|
|
|
|
|
|
|
|
|
# Threads handling Rx/Tx frames |
|
|
|
|
self._rx_thread = threading.Thread(target=self._rx_worker, |
|
|
|
|
name='DbgMuxPeer-Rx', |
|
|
|
|
daemon=True) |
|
|
|
|
self._tx_thread = threading.Thread(target=self._tx_worker, |
|
|
|
|
name='DbgMuxPeer-Tx', |
|
|
|
|
daemon=True) |
|
|
|
|
|
|
|
|
|
self._shutdown = threading.Event() |
|
|
|
|
|
|
|
|
|
# Internal queues for Rx/Tx frames |
|
|
|
|
self._rx_queue = queue.Queue() |
|
|
|
|
self._tx_queue = queue.Queue() |
|
|
|
|
|
|
|
|
|
def start(self) -> None: |
|
|
|
|
self._shutdown.clear() |
|
|
|
|
self._rx_thread.start() |
|
|
|
|
self._tx_thread.start() |
|
|
|
|
|
|
|
|
|
def stop(self) -> None: |
|
|
|
|
# Wait for all Tx messages to be processed |
|
|
|
|
self._tx_queue.join() |
|
|
|
|
# Set the shutdown event |
|
|
|
|
self._shutdown.set() |
|
|
|
|
|
|
|
|
|
def _rx_worker(self) -> None: |
|
|
|
|
while not self._shutdown.is_set(): |
|
|
|
|
frame = self._recv() # blocking |
|
|
|
|
self._rx_queue.put(frame) |
|
|
|
|
log.debug('Thread \'%s\' is shutting down', threading.currentThread().name) |
|
|
|
|
|
|
|
|
|
def _tx_worker(self) -> None: |
|
|
|
|
while not self._shutdown.is_set(): |
|
|
|
|
(msg_type, msg) = self._tx_queue.get(block=True) |
|
|
|
|
self._send(msg_type, msg) |
|
|
|
|
self._tx_queue.task_done() |
|
|
|
|
log.debug('Thread \'%s\' is shutting down', threading.currentThread().name) |
|
|
|
|
|
|
|
|
|
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None: |
|
|
|
|
''' Send a single message (non-blocking call) ''' |
|
|
|
|
self._tx_queue.put((msg_type, msg)) |
|
|
|
|
|
|
|
|
|
def _send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None: |
|
|
|
|
# Encode the inner message first |
|
|
|
|
msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type) |
|
|
|
|
|
|
|
|
@ -64,6 +108,10 @@ class DbgMuxPeer: |
|
|
|
|
self.tx_count += 1 |
|
|
|
|
|
|
|
|
|
def recv(self) -> Container: |
|
|
|
|
''' Receive a single message (blocking call) ''' |
|
|
|
|
return self._rx_queue.get(block=True) |
|
|
|
|
|
|
|
|
|
def _recv(self) -> Container: |
|
|
|
|
frame: bytes = b'' |
|
|
|
|
frame += self.io.read(2) # Magic |
|
|
|
|
Const(b'\x42\x42').parse(frame[:2]) |
|
|
|
|