From af18b58e1f2e46dccd143aa318f193e2453881ac Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Fri, 24 Jun 2022 19:18:47 +0700 Subject: [PATCH] Implement the concept of the frame dispatcher and handlers --- handlers.py | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++ peer.py | 69 ++++++++++++++++++++++++++++- 2 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 handlers.py diff --git a/handlers.py b/handlers.py new file mode 100644 index 0000000..676b630 --- /dev/null +++ b/handlers.py @@ -0,0 +1,122 @@ +# This file is a part of sedbgmux, an open source DebugMux client. +# Copyright (c) 2022 Vadim Yanitskiy +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging as log +import threading +import queue +import enum +import abc + +from typing import Any, Optional +from construct import Container +from proto import DbgMuxFrame + + +class DbgMuxFrameHandler(abc.ABC): + ''' Abstract DebugMux frame handler ''' + + _tx_queue: Optional[queue.Queue] = None + + def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b''): + ''' Called by child classes to send some message ''' + assert self._tx_queue is not None + self._tx_queue.put((msg_type, msg)) + + @abc.abstractmethod + def _handle_frame(self, frame: Container) -> None: + ''' Handle the given DebugMux frame ''' + + +class ConnState(enum.Enum): + ''' Connection state for DbgMuxConnHandler ''' + NotEstablished = enum.auto() + Establishing = enum.auto() + Established = enum.auto() + + +class DbgMuxConnHandler(DbgMuxFrameHandler): + ''' Abstract DebugMux connection handler ''' + + def __init__(self): + self.conn_state: ConnState = ConnState.NotEstablished + self.ConnRef: int = 0xffff + self.DPRef: int = 0xffff + + def establish(self, DPRef: int) -> None: + assert self.conn_state == ConnState.NotEstablished + log.info("Establishing connection with DPRef=0x%04x", DPRef) + self.send(DbgMuxFrame.MsgType.ConnEstablish, dict(DPRef=DPRef)) + self.conn_state = ConnState.Establishing + self.DPRef = DPRef + + def send_data(self, data: bytes) -> None: + ''' Called by child classes to send connection data ''' + assert self.conn_state == ConnState.Established + msg = dict(ConnRef=self.ConnRef, Data=data) + self.send(DbgMuxFrame.MsgType.ConnData, msg) + + def _match(self, frame: Container, + msg_type: DbgMuxFrame.MsgType, + msg_fields: dict = { }) -> bool: + if frame['MsgType'] != msg_type: + return False + for (key, val) in msg_fields.items(): + if key not in frame['Msg']: + return False + if frame['Msg'][key] != val: + return False + return True + + def _handle_frame(self, frame: Container) -> None: + ''' Handle the given DebugMux frame ''' + + if self.conn_state == ConnState.Established: + fields = dict(ConnRef=self.ConnRef) + if self._match(frame, DbgMuxFrame.MsgType.ConnData, fields): + self._handle_data(frame['Msg']['Data']) + self.send(DbgMuxFrame.MsgType.Ack) + raise StopIteration + elif self._match(frame, DbgMuxFrame.MsgType.ConnTerminated, fields): + log.info('Connection terminated (DPRef=0x%04x, ConnRef=0x%04x)', + self.DPRef, self.ConnRef) + self.conn_state = ConnState.NotEstablished + self._handle_terminate() + self.send(DbgMuxFrame.MsgType.Ack) + raise StopIteration + elif self.conn_state == ConnState.Establishing: + # Match ConnEstablished with our DPRef and any ConnRef + if self._match(frame, DbgMuxFrame.MsgType.ConnEstablished, dict(DPRef=self.DPRef)): + log.info('Connection established (DPRef=0x%04x, ConnRef=0x%04x)', + self.DPRef, frame['Msg']['ConnRef']) + self.conn_state = ConnState.Established + self.ConnRef = frame['Msg']['ConnRef'] + self._handle_establish() + self.send(DbgMuxFrame.MsgType.Ack) + raise StopIteration + + @abc.abstractmethod + def _handle_data(self, data: bytes) -> None: + ''' Called on reciept of connection data ''' + + @abc.abstractmethod + def _handle_establish(self) -> None: + ''' Called on connection establishment ''' + + @abc.abstractmethod + def _handle_terminate(self) -> None: + ''' Called on connection termination ''' diff --git a/peer.py b/peer.py index ae17614..5265dd2 100644 --- a/peer.py +++ b/peer.py @@ -23,10 +23,73 @@ import queue from typing import Any, Optional from construct import Const, Container, Int16ul +from handlers import DbgMuxFrameHandler from transport import Transport from proto import DbgMuxFrame +class DbgMuxFrameDisp: + ''' DebugMux frame dispatcher ''' + + def __init__(self, rxq: queue.Queue, txq: queue.Queue): + self.__hlist = list() + self._rx_queue = rxq + self._tx_queue = txq + + self._thread = threading.Thread(target=self._worker, + name='DbgMuxPeer-Disp', + daemon=True) + self._shutdown = threading.Event() + + def start(self) -> None: + self._shutdown.clear() + self._thread.start() + + def stop(self) -> None: + self._shutdown.set() + self._thread.join() + + def find_by_name(self, name: str) -> Optional[DbgMuxFrameHandler]: + for (hname, hinst) in self.__hlist: + if hname == name: + return hinst + return None + + def register(self, inst: DbgMuxFrameHandler, + name: Optional[str] = None) -> None: + if name is not None and self.find_by_name(name): + raise FileExistsError + self.__hlist.append((name, inst)) + # Give a handler access to the Tx queue + inst._tx_queue = self._tx_queue + + def unregister(self, name: str): + inst = self.find_by_name(name) + if inst is None: + raise FileNotFoundError + self.__hlist.remove((name, inst)) + + def _worker(self) -> None: + while not self._shutdown.is_set(): + try: + frame = self._rx_queue.get(block=True, timeout=0.5) + self._dispatch(frame) + except queue.Empty: + pass + log.debug('Thread \'%s\' is shutting down', threading.current_thread().name) + + def _dispatch(self, frame: DbgMuxFrame) -> None: + for (hname, hinst) in self.__hlist: + try: + hinst._handle_frame(frame) + except StopIteration: + return + except Exception as e: + log.error("Handler '%s' raised an exception: %s", hname, e) + # TODO: remove this handler? + log.error("DebugMux message %s was not handled, dropping...", frame['Msg']) + + class DbgMuxPeer: def __init__(self, io: Transport): self.tx_count: int = 0 @@ -40,13 +103,17 @@ class DbgMuxPeer: self._tx_thread = threading.Thread(target=self._tx_worker, name='DbgMuxPeer-Tx', daemon=True) - self._shutdown = threading.Event() # Internal queues for Rx/Tx frames self._rx_queue = queue.Queue() self._tx_queue = queue.Queue() + # Init frame dispatcher + self.disp = DbgMuxFrameDisp(self._rx_queue, + self._tx_queue) + # TODO: self.disp.start() + def start(self) -> None: self._shutdown.clear() self._rx_thread.start()