# This file is a part of sedbgmux, an open source DebugMux client. # Copyright (c) 2022 Vadim Yanitskiy # # SPDX-License-Identifier: GPL-3.0-or-later # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging as log import threading import queue import enum import abc from typing import Any, Optional from construct import Container from proto import DbgMuxFrame class DbgMuxFrameHandler(abc.ABC): ''' Abstract DebugMux frame handler ''' _tx_queue: Optional[queue.Queue] = None def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b''): ''' Called by child classes to send some message ''' assert self._tx_queue is not None self._tx_queue.put((msg_type, msg)) @abc.abstractmethod def _handle_frame(self, frame: Container) -> None: ''' Handle the given DebugMux frame ''' class ConnState(enum.Enum): ''' Connection state for DbgMuxConnHandler ''' NotEstablished = enum.auto() Establishing = enum.auto() Established = enum.auto() class DbgMuxConnHandler(DbgMuxFrameHandler): ''' Abstract DebugMux connection handler ''' def __init__(self): self.conn_state: ConnState = ConnState.NotEstablished self.ConnRef: int = 0xffff self.DPRef: int = 0xffff def establish(self, DPRef: int) -> None: assert self.conn_state == ConnState.NotEstablished log.info("Establishing connection with DPRef=0x%04x", DPRef) self.send(DbgMuxFrame.MsgType.ConnEstablish, dict(DPRef=DPRef)) self.conn_state = ConnState.Establishing self.DPRef = DPRef def send_data(self, data: bytes) -> None: ''' Called by child classes to send connection data ''' assert self.conn_state == ConnState.Established msg = dict(ConnRef=self.ConnRef, Data=data) self.send(DbgMuxFrame.MsgType.ConnData, msg) def _match(self, frame: Container, msg_type: DbgMuxFrame.MsgType, msg_fields: dict = { }) -> bool: if frame['MsgType'] != msg_type: return False for (key, val) in msg_fields.items(): if key not in frame['Msg']: return False if frame['Msg'][key] != val: return False return True def _handle_frame(self, frame: Container) -> None: ''' Handle the given DebugMux frame ''' if self.conn_state == ConnState.Established: fields = dict(ConnRef=self.ConnRef) if self._match(frame, DbgMuxFrame.MsgType.ConnData, fields): self._handle_data(frame['Msg']['Data']) self.send(DbgMuxFrame.MsgType.Ack) raise StopIteration elif self._match(frame, DbgMuxFrame.MsgType.ConnTerminated, fields): log.info('Connection terminated (DPRef=0x%04x, ConnRef=0x%04x)', self.DPRef, self.ConnRef) self.conn_state = ConnState.NotEstablished self._handle_terminate() self.send(DbgMuxFrame.MsgType.Ack) raise StopIteration elif self.conn_state == ConnState.Establishing: # Match ConnEstablished with our DPRef and any ConnRef if self._match(frame, DbgMuxFrame.MsgType.ConnEstablished, dict(DPRef=self.DPRef)): log.info('Connection established (DPRef=0x%04x, ConnRef=0x%04x)', self.DPRef, frame['Msg']['ConnRef']) self.conn_state = ConnState.Established self.ConnRef = frame['Msg']['ConnRef'] self._handle_establish() self.send(DbgMuxFrame.MsgType.Ack) raise StopIteration @abc.abstractmethod def _handle_data(self, data: bytes) -> None: ''' Called on reciept of connection data ''' @abc.abstractmethod def _handle_establish(self) -> None: ''' Called on connection establishment ''' @abc.abstractmethod def _handle_terminate(self) -> None: ''' Called on connection termination '''