216 lines
7.6 KiB
Python
216 lines
7.6 KiB
Python
# This file is a part of sedbgmux, an open source DebugMux client.
|
|
# Copyright (c) 2022 Vadim Yanitskiy <axilirator@gmail.com>
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but
|
|
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import logging as log
|
|
import threading
|
|
import queue
|
|
|
|
from typing import Any, Optional
|
|
from construct import Const, Container, Int16ul
|
|
|
|
from handlers import DbgMuxFrameHandler
|
|
from transport import Transport
|
|
from proto import DbgMuxFrame
|
|
|
|
|
|
class DbgMuxFrameDisp:
|
|
''' DebugMux frame dispatcher '''
|
|
|
|
def __init__(self, rxq: queue.Queue, txq: queue.Queue):
|
|
self.__hlist = list()
|
|
self._rx_queue = rxq
|
|
self._tx_queue = txq
|
|
|
|
self._thread = threading.Thread(target=self._worker,
|
|
name='DbgMuxPeer-Disp',
|
|
daemon=True)
|
|
self._shutdown = threading.Event()
|
|
|
|
def start(self) -> None:
|
|
self._shutdown.clear()
|
|
self._thread.start()
|
|
|
|
def stop(self) -> None:
|
|
self._shutdown.set()
|
|
self._thread.join()
|
|
|
|
def find_by_name(self, name: str) -> Optional[DbgMuxFrameHandler]:
|
|
for (hname, hinst) in self.__hlist:
|
|
if hname == name:
|
|
return hinst
|
|
return None
|
|
|
|
def register(self, inst: DbgMuxFrameHandler,
|
|
name: Optional[str] = None) -> None:
|
|
if name is not None and self.find_by_name(name):
|
|
raise FileExistsError
|
|
self.__hlist.append((name, inst))
|
|
# Give a handler access to the Tx queue
|
|
inst._tx_queue = self._tx_queue
|
|
|
|
def unregister(self, name: str):
|
|
inst = self.find_by_name(name)
|
|
if inst is None:
|
|
raise FileNotFoundError
|
|
self.__hlist.remove((name, inst))
|
|
|
|
def _worker(self) -> None:
|
|
while not self._shutdown.is_set():
|
|
try:
|
|
frame = self._rx_queue.get(block=True, timeout=0.5)
|
|
self._dispatch(frame)
|
|
except queue.Empty:
|
|
pass
|
|
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
|
|
|
|
def _dispatch(self, frame: DbgMuxFrame) -> None:
|
|
for (hname, hinst) in self.__hlist:
|
|
try:
|
|
hinst._handle_frame(frame)
|
|
except StopIteration:
|
|
return
|
|
except Exception as e:
|
|
log.error("Handler '%s' raised an exception: %s", hname, e)
|
|
# TODO: remove this handler?
|
|
log.error("DebugMux message %s was not handled, dropping...", frame['Msg'])
|
|
|
|
|
|
class DbgMuxPeer:
|
|
def __init__(self, io: Transport):
|
|
self.tx_count: int = 0
|
|
self.rx_count: int = 0
|
|
self.io = io
|
|
|
|
# Threads handling Rx/Tx frames
|
|
self._rx_thread = threading.Thread(target=self._rx_worker,
|
|
name='DbgMuxPeer-Rx',
|
|
daemon=True)
|
|
self._tx_thread = threading.Thread(target=self._tx_worker,
|
|
name='DbgMuxPeer-Tx',
|
|
daemon=True)
|
|
self._shutdown = threading.Event()
|
|
|
|
# Internal queues for Rx/Tx frames
|
|
self._rx_queue = queue.Queue()
|
|
self._tx_queue = queue.Queue()
|
|
|
|
# Init frame dispatcher
|
|
self.disp = DbgMuxFrameDisp(self._rx_queue,
|
|
self._tx_queue)
|
|
|
|
def start(self) -> None:
|
|
self._shutdown.clear()
|
|
self._rx_thread.start()
|
|
self._tx_thread.start()
|
|
self.disp.start()
|
|
|
|
def stop(self) -> None:
|
|
# Set the shutdown event
|
|
self._shutdown.set()
|
|
# Wait for both threads to terminate
|
|
self._tx_thread.join()
|
|
self._rx_thread.join()
|
|
self.disp.stop()
|
|
|
|
def _rx_worker(self) -> None:
|
|
while not self._shutdown.is_set():
|
|
frame = self._recv() # blocking until timeout
|
|
if frame is not None:
|
|
self._rx_queue.put(frame)
|
|
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
|
|
|
|
def _tx_worker(self) -> None:
|
|
while not self._shutdown.is_set():
|
|
try:
|
|
(msg_type, msg) = self._tx_queue.get(block=True, timeout=0.5)
|
|
self._send(msg_type, msg)
|
|
self._tx_queue.task_done()
|
|
except queue.Empty:
|
|
pass
|
|
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
|
|
|
|
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
|
|
''' Send a single message (non-blocking call) '''
|
|
self._tx_queue.put((msg_type, msg))
|
|
|
|
def _send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
|
|
# Encode the inner message first
|
|
msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type)
|
|
|
|
c = Container({
|
|
'TxCount': (self.tx_count + 1) % 256,
|
|
'RxCount': self.rx_count % 256,
|
|
'MsgType': msg_type,
|
|
'MsgData': msg_data,
|
|
'FCS': 0 # Calculated below
|
|
})
|
|
|
|
# ACK is a bit special
|
|
if msg_type == DbgMuxFrame.MsgType.Ack:
|
|
c['TxCount'] = 0xf1
|
|
|
|
# There is a Checksum construct, but it requires all checksummed fields
|
|
# to be wrapped into an additional RawCopy construct. This is ugly and
|
|
# inconvinient from the API point of view, so we calculate the FCS manually:
|
|
frame = DbgMuxFrame.Frame.build(c)[:-2] # strip b'\x00\x00'
|
|
c['FCS'] = DbgMuxFrame.fcs_func(frame)
|
|
|
|
log.debug('Tx frame (Ns=%03u, Nr=%03u, fcs=0x%04x) %s %s',
|
|
c['TxCount'], c['RxCount'], c['FCS'],
|
|
c['MsgType'], c['MsgData'].hex())
|
|
|
|
self.io.write(frame + Int16ul.build(c['FCS']))
|
|
|
|
# ACK is not getting accounted
|
|
if msg_type != DbgMuxFrame.MsgType.Ack:
|
|
self.tx_count += 1
|
|
|
|
def recv(self, timeout: Optional[float] = None) -> Container:
|
|
''' Receive a single message (blocking call) '''
|
|
return self._rx_queue.get(block=True, timeout=timeout)
|
|
|
|
def _recv(self) -> Optional[Container]:
|
|
frame: bytes = b''
|
|
frame += self.io.read(2) # Magic
|
|
if frame == b'':
|
|
return None
|
|
Const(b'\x42\x42').parse(frame[:2])
|
|
frame += self.io.read(2) # Length
|
|
length: int = Int16ul.parse(frame[2:])
|
|
frame += self.io.read(length) # Rest
|
|
|
|
c = DbgMuxFrame.Frame.parse(frame)
|
|
|
|
log.debug('Rx frame (Ns=%03u, Nr=%03u, fcs=0x%04x) %s %s',
|
|
c['TxCount'], c['RxCount'], c['FCS'],
|
|
c['MsgType'], c['MsgData'].hex())
|
|
|
|
# Re-calculate and check the FCS
|
|
fcs = DbgMuxFrame.fcs_func(frame[:-2])
|
|
if fcs != c['FCS']:
|
|
log.error('Rx frame (Ns=%03u, Nr=%03u) with bad FCS: '
|
|
'indicated 0x%04x != calculated 0x%04x',
|
|
c['TxCount'], c['RxCount'], c['FCS'], fcs)
|
|
# TODO: NACK this frame?
|
|
|
|
# Parse the inner message
|
|
c['Msg'] = DbgMuxFrame.Msg.parse(c['MsgData'], MsgType=c['MsgType'])
|
|
|
|
self.rx_count += 1
|
|
return c
|