sedbgmux/peer.py

215 lines
7.6 KiB
Python

# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2022 Vadim Yanitskiy <axilirator@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
import queue
from typing import Any, Optional
from construct import Const, Container, Int16ul
from handlers import DbgMuxFrameHandler
from transport import Transport
from proto import DbgMuxFrame
class DbgMuxFrameDisp:
''' DebugMux frame dispatcher '''
def __init__(self, rxq: queue.Queue, txq: queue.Queue):
self.__hlist = list()
self._rx_queue = rxq
self._tx_queue = txq
self._thread = threading.Thread(target=self._worker,
name='DbgMuxPeer-Disp',
daemon=True)
self._shutdown = threading.Event()
def start(self) -> None:
self._shutdown.clear()
self._thread.start()
def stop(self) -> None:
self._shutdown.set()
self._thread.join()
def find_by_name(self, name: str) -> Optional[DbgMuxFrameHandler]:
for (hname, hinst) in self.__hlist:
if hname == name:
return hinst
return None
def register(self, inst: DbgMuxFrameHandler,
name: Optional[str] = None) -> None:
if name is not None and self.find_by_name(name):
raise FileExistsError
self.__hlist.append((name, inst))
# Give a handler access to the Tx queue
inst._tx_queue = self._tx_queue
def unregister(self, name: str):
inst = self.find_by_name(name)
if inst is None:
raise FileNotFoundError
self.__hlist.remove((name, inst))
def _worker(self) -> None:
while not self._shutdown.is_set():
try:
frame = self._rx_queue.get(block=True, timeout=0.5)
self._dispatch(frame)
except queue.Empty:
pass
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def _dispatch(self, frame: DbgMuxFrame) -> None:
for (hname, hinst) in self.__hlist:
try:
hinst._handle_frame(frame)
except StopIteration:
return
except Exception as e:
log.error("Handler '%s' raised an exception: %s", hname, e)
# TODO: remove this handler?
log.error("DebugMux message %s was not handled, dropping...", frame['Msg'])
class DbgMuxPeer:
def __init__(self, io: Transport):
self.tx_count: int = 0
self.rx_count: int = 0
self.io = io
# Threads handling Rx/Tx frames
self._rx_thread = threading.Thread(target=self._rx_worker,
name='DbgMuxPeer-Rx',
daemon=True)
self._tx_thread = threading.Thread(target=self._tx_worker,
name='DbgMuxPeer-Tx',
daemon=True)
self._shutdown = threading.Event()
# Internal queues for Rx/Tx frames
self._rx_queue = queue.Queue()
self._tx_queue = queue.Queue()
# Init frame dispatcher
self.disp = DbgMuxFrameDisp(self._rx_queue,
self._tx_queue)
# TODO: self.disp.start()
def start(self) -> None:
self._shutdown.clear()
self._rx_thread.start()
self._tx_thread.start()
def stop(self) -> None:
# Set the shutdown event
self._shutdown.set()
# Wait for both threads to terminate
self._tx_thread.join()
self._rx_thread.join()
def _rx_worker(self) -> None:
while not self._shutdown.is_set():
frame = self._recv() # blocking until timeout
if frame is not None:
self._rx_queue.put(frame)
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def _tx_worker(self) -> None:
while not self._shutdown.is_set():
try:
(msg_type, msg) = self._tx_queue.get(block=True, timeout=0.5)
self._send(msg_type, msg)
self._tx_queue.task_done()
except queue.Empty:
pass
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
''' Send a single message (non-blocking call) '''
self._tx_queue.put((msg_type, msg))
def _send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
# Encode the inner message first
msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type)
c = Container({
'TxCount': (self.tx_count + 1) % 256,
'RxCount': self.rx_count % 256,
'MsgType': msg_type,
'MsgData': msg_data,
'FCS': 0 # Calculated below
})
# ACK is a bit special
if msg_type == DbgMuxFrame.MsgType.Ack:
c['TxCount'] = 0xf1
# There is a Checksum construct, but it requires all checksummed fields
# to be wrapped into an additional RawCopy construct. This is ugly and
# inconvinient from the API point of view, so we calculate the FCS manually:
frame = DbgMuxFrame.Frame.build(c)[:-2] # strip b'\x00\x00'
c['FCS'] = DbgMuxFrame.fcs_func(frame)
log.debug('Tx frame (Ns=%03u, Nr=%03u, fcs=0x%04x) %s %s',
c['TxCount'], c['RxCount'], c['FCS'],
c['MsgType'], c['MsgData'].hex())
self.io.write(frame + Int16ul.build(c['FCS']))
# ACK is not getting accounted
if msg_type != DbgMuxFrame.MsgType.Ack:
self.tx_count += 1
def recv(self, timeout: Optional[float] = None) -> Container:
''' Receive a single message (blocking call) '''
return self._rx_queue.get(block=True, timeout=timeout)
def _recv(self) -> Optional[Container]:
frame: bytes = b''
frame += self.io.read(2) # Magic
if frame == b'':
return None
Const(b'\x42\x42').parse(frame[:2])
frame += self.io.read(2) # Length
length: int = Int16ul.parse(frame[2:])
frame += self.io.read(length) # Rest
c = DbgMuxFrame.Frame.parse(frame)
log.debug('Rx frame (Ns=%03u, Nr=%03u, fcs=0x%04x) %s %s',
c['TxCount'], c['RxCount'], c['FCS'],
c['MsgType'], c['MsgData'].hex())
# Re-calculate and check the FCS
fcs = DbgMuxFrame.fcs_func(frame[:-2])
if fcs != c['FCS']:
log.error('Rx frame (Ns=%03u, Nr=%03u) with bad FCS: '
'indicated 0x%04x != calculated 0x%04x',
c['TxCount'], c['RxCount'], c['FCS'], fcs)
# TODO: NACK this frame?
# Parse the inner message
c['Msg'] = DbgMuxFrame.Msg.parse(c['MsgData'], MsgType=c['MsgType'])
self.rx_count += 1
return c