SEDbgMuxApp: separate modem transport into its own module

This commit is contained in:
Vadim Yanitskiy 2022-03-28 01:04:54 +03:00
parent 37426267f4
commit 9034073630
3 changed files with 116 additions and 47 deletions

10
peer.py
View File

@ -20,14 +20,16 @@ import logging as log
from typing import Any from typing import Any
from construct import Container, Int16ul from construct import Container, Int16ul
from transport import Transport
from proto import DbgMuxFrame from proto import DbgMuxFrame
class DbgMuxPeer: class DbgMuxPeer:
def __init__(self, sl): def __init__(self, io: Transport):
self.tx_count: int = 0 self.tx_count: int = 0
self.rx_count: int = 0 self.rx_count: int = 0
self._sl = sl self.io = io
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None: def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
# Encode the inner message first # Encode the inner message first
@ -55,14 +57,14 @@ class DbgMuxPeer:
c['TxCount'], c['RxCount'], c['FCS'], c['TxCount'], c['RxCount'], c['FCS'],
c['MsgType'], c['MsgData'].hex()) c['MsgType'], c['MsgData'].hex())
self._sl.write(frame + Int16ul.build(c['FCS'])) self.io.write(frame + Int16ul.build(c['FCS']))
# ACK is not getting accounted # ACK is not getting accounted
if msg_type != DbgMuxFrame.MsgType.Ack: if msg_type != DbgMuxFrame.MsgType.Ack:
self.tx_count += 1 self.tx_count += 1
def recv(self) -> Container: def recv(self) -> Container:
c = DbgMuxFrame.Frame.parse_stream(self._sl) c = DbgMuxFrame.Frame.parse_stream(self.io)
log.debug('Rx frame (Ns=%03u, Nr=%03u, fcs=0x%04x) %s %s', log.debug('Rx frame (Ns=%03u, Nr=%03u, fcs=0x%04x) %s %s',
c['TxCount'], c['RxCount'], c['FCS'], c['TxCount'], c['RxCount'], c['FCS'],

View File

@ -20,11 +20,11 @@
import logging as log import logging as log
import argparse import argparse
import serial
import cmd2 import cmd2
import enum import enum
import sys import sys
from transport import TransportModem
from proto import DbgMuxFrame from proto import DbgMuxFrame
from peer import DbgMuxPeer from peer import DbgMuxPeer
@ -44,6 +44,10 @@ class SEDbgMuxApp(cmd2.Cmd):
self.default_category = 'Built-in commands' self.default_category = 'Built-in commands'
self.argv = argv self.argv = argv
# Init the transport layer and DebugMux peer
self.transport = TransportModem(self.argv)
self.peer = DbgMuxPeer(self.transport)
# Modem connection state # Modem connection state
self.set_connected(False) self.set_connected(False)
@ -58,28 +62,13 @@ class SEDbgMuxApp(cmd2.Cmd):
@cmd2.with_category(CATEGORY_CONN) @cmd2.with_category(CATEGORY_CONN)
def do_connect(self, opts) -> None: def do_connect(self, opts) -> None:
''' Connect to the modem and switch it to DebugMux mode ''' ''' Connect to the modem and switch it to DebugMux mode '''
self.sl = serial.Serial(port=self.argv.serial_port, self.transport.connect()
baudrate=self.argv.serial_baudrate,
bytesize=8, parity='N', stopbits=1,
timeout=self.argv.serial_timeout,
# xonoff=False,
rtscts=False,
dsrdtr=False)
# Test the modem
self.transceive('AT', 'OK')
# Enable DebugMux mode
self.transceive('AT*EDEBUGMUX', 'CONNECT')
# Init DebugMux peer
self.peer = DbgMuxPeer(self.sl)
self.set_connected(True) self.set_connected(True)
@cmd2.with_category(CATEGORY_CONN) @cmd2.with_category(CATEGORY_CONN)
def do_disconnect(self, opts) -> None: def do_disconnect(self, opts) -> None:
''' Disconnect from the modem ''' ''' Disconnect from the modem '''
self.sl.close() self.transport.disconnect()
self.sl = None
self.peer = None
self.set_connected(False) self.set_connected(False)
@cmd2.with_category(CATEGORY_CONN) @cmd2.with_category(CATEGORY_CONN)
@ -108,7 +97,8 @@ class SEDbgMuxApp(cmd2.Cmd):
f['Msg']['DPRef'], f['Msg']['Name']) f['Msg']['DPRef'], f['Msg']['Name'])
# No more data in the buffer # No more data in the buffer
if self.sl.in_waiting == 0: # FIXME: layer violation!
if self.transport._sl.in_waiting == 0:
break break
# ACKnowledge reception of the info # ACKnowledge reception of the info
@ -170,30 +160,6 @@ class SEDbgMuxApp(cmd2.Cmd):
# ACKnowledge reception of a frame # ACKnowledge reception of a frame
self.peer.send(DbgMuxFrame.MsgType.Ack) self.peer.send(DbgMuxFrame.MsgType.Ack)
def send_data(self, data: bytes) -> None:
log.debug("MODEM <- %s", str(data))
self.sl.write(data)
def send_at_cmd(self, cmd: str, handle_echo: bool = True) -> None:
self.send_data(cmd.encode() + b'\r')
if handle_echo:
self.sl.readline()
def read_at_rsp(self) -> str:
rsp = self.sl.readline()
log.debug("MODEM -> %s", str(rsp))
return rsp.rstrip().decode()
def transceive(self, cmd: str, exp: str) -> None:
while True:
self.send_at_cmd(cmd)
rsp = self.read_at_rsp()
if rsp[:7] == '*EMRDY:':
continue
assert rsp == exp
break
ap = argparse.ArgumentParser(prog='sedbgmux', description=SEDbgMuxApp.DESC, ap = argparse.ArgumentParser(prog='sedbgmux', description=SEDbgMuxApp.DESC,
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)

101
transport.py Normal file
View File

@ -0,0 +1,101 @@
#!/usr/bin/env python3
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2022 Vadim Yanitskiy <axilirator@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import serial
import abc
class Transport(abc.ABC):
''' Abstract transport layer for DebugMux '''
def connect(self, opts: dict) -> None:
''' Establish connection to the target and enter DebugMux mode '''
def disconnect(self) -> None:
''' Escape DebugMux mode and terminate connection with the target '''
def write(self, data: bytes) -> int:
''' Write the given data bytes '''
def read(self, length: int = 0) -> bytes:
''' Read the given number of bytes '''
class TransportModem(Transport):
''' Modem based transport layer for DebugMux '''
def __init__(self, opts: dict) -> None:
self.modem_port = opts.serial_port
self.modem_baudrate = opts.serial_baudrate
self.modem_timeout = opts.serial_timeout
def connect(self) -> None:
''' Establish connection to the target and enter DebugMux mode '''
self._sl = serial.Serial(port=self.modem_port,
baudrate=self.modem_baudrate,
bytesize=8, parity='N', stopbits=1,
timeout=self.modem_timeout,
# xonoff=False,
rtscts=False,
dsrdtr=False)
# Test the modem
self.transceive('AT', 'OK')
# Enable DebugMux mode
self.transceive('AT*EDEBUGMUX', 'CONNECT')
def disconnect(self) -> None:
''' Escape DebugMux mode and terminate connection with the target '''
# TODO: escape DebugMux mode
self._sl.close()
del self._sl
def write(self, data: bytes) -> int:
''' Write the given data bytes '''
return self._sl.write(data)
def read(self, length: int = 0) -> bytes:
''' Read the given number of bytes '''
return self._sl.read(length)
def send_at_cmd(self, cmd: str, handle_echo: bool = True) -> None:
''' Send an AT command to the modem '''
data: bytes = cmd.encode() + b'\r'
log.debug('MODEM <- %s', str(data))
self.write(data)
if handle_echo:
self._sl.readline()
def read_at_rsp(self) -> str:
''' Read an AT command response from the modem '''
rsp = self._sl.readline()
log.debug('MODEM -> %s', str(rsp))
return rsp.rstrip().decode()
def transceive(self, cmd: str, exp: str) -> None:
while True:
self.send_at_cmd(cmd)
rsp = self.read_at_rsp()
if rsp[:7] == '*EMRDY:':
continue
assert rsp == exp
break