Implement optional dumping of DebugMux frames to file

This commit is contained in:
Vadim Yanitskiy 2023-01-15 15:16:21 +06:00
parent 5b2c57422d
commit 56b0ad8e6e
5 changed files with 112 additions and 1 deletions

View File

@ -25,6 +25,7 @@ import enum
import sys
from sedbgmux.io import DbgMuxIOModem
from sedbgmux.io import DumpIONative
from sedbgmux import DbgMuxFrame
from sedbgmux import DbgMuxPeer
@ -48,6 +49,11 @@ class SEDbgMuxApp(cmd2.Cmd):
self.io = DbgMuxIOModem(self.argv)
self.peer = DbgMuxPeer(self.io)
# Optionally dump DebugMux frames to a file
if argv.dump_file is not None:
dump = DumpIONative(argv.dump_file, readonly=False)
self.peer.enable_dump(dump)
# Modem connection state
self.set_connected(False)
@ -172,6 +178,8 @@ group.add_argument('--serial-baudrate', metavar='BAUDRATE', type=int, default=11
help='Serial port speed (default %(default)s)')
group.add_argument('--serial-timeout', metavar='TIMEOUT', type=float, default=0.5,
help='Serial port read timeout (default %(default)s)')
group.add_argument('--dump-file', metavar='FILE', type=str,
help='Save Rx/Tx DebugMux frames to a file')
log.basicConfig(
format='[%(levelname)s] %(filename)s:%(lineno)d %(message)s', level=log.INFO)

View File

@ -1,3 +1,8 @@
from .base import DbgMuxIO
from .base import DbgMuxIOError
from .modem import DbgMuxIOModem
from .base import DumpIO
from .base import DumpIOError
from .base import DumpIOEndOfFile
from .dump_native import DumpIONative

View File

@ -24,6 +24,12 @@ import abc
class DbgMuxIOError(Exception):
''' I/O error during read/write operation '''
class DumpIOError(Exception):
''' Dump I/O error '''
class DumpIOEndOfFile(DumpIOError):
''' Dump I/O EOF error '''
class DbgMuxIO(abc.ABC):
''' Abstract I/O layer for DebugMux '''
@ -43,3 +49,15 @@ class DbgMuxIO(abc.ABC):
@abc.abstractmethod
def write(self, data: bytes) -> int:
''' Write the given data bytes '''
class DumpIO(abc.ABC):
''' Abstract dump I/O interface '''
@abc.abstractmethod
def read(self) -> dict:
''' Read a single record from dump '''
@abc.abstractmethod
def write(self, record: dict) -> None:
''' Store a single record to dump '''

View File

@ -0,0 +1,67 @@
#!/usr/bin/env python3
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2023 Vadim Yanitskiy <fixeria@osmocom.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import time
from construct import *
from . import DumpIO
from . import DumpIOError
from . import DumpIOEndOfFile
class DumpIONative(DumpIO):
''' Native dump format for this package '''
DumpRecord = Struct(
'Tag' / Const(b'\x01'),
'Length' / Rebuild(Int16ul, lambda ctx: len(ctx.Data) + 5),
'Direction' / Enum(subcon=Int8ul, Rx=0x00, Tx=0x01),
'Timestamp' / Rebuild(Float64l, lambda ctx: ctx.get('Timestamp') or time.time()),
'Data' / Bytes(lambda ctx: ctx.Length - 5),
)
def __init__(self, fname: str, readonly: bool = False) -> None:
log.info('Opening dump file %s (%s mode)',
fname, 'readonly' if readonly else 'write')
mode: str = 'rb' if readonly else 'ab'
self._file = open(fname, mode)
self.readonly = readonly
def read(self) -> dict:
''' Read a single record from dump '''
try:
c = self.DumpRecord.parse_stream(self._file)
except StreamError as e:
raise DumpIOEndOfFile from e
return dict(dir=str(c['Direction']),
timestamp=c['Timestamp'],
data=c['Data'])
def write(self, record: dict) -> None:
''' Store a single record to dump '''
if self.readonly:
raise DumpIOError('Read-only mode')
c = Container(Direction=record['dir'], Data=record['data'])
if 'timestamp' in record:
c['Timestamp'] = record['timestamp']
dr: bytes = self.DumpRecord.build(c)
self._file.write(dr)

View File

@ -22,15 +22,20 @@ from typing import Any, Optional
from construct import Const, Container, Int16ul
from .io import DbgMuxIO
from .io import DumpIO
from . import DbgMuxFrame
class DbgMuxPeer:
def __init__(self, io: DbgMuxIO):
self.dump: Optional[DumpIO] = None
self.tx_count: int = 0
self.rx_count: int = 0
self.io = io
def enable_dump(self, dump: DumpIO):
self.dump = dump
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
# Encode the inner message first
msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type)
@ -52,12 +57,16 @@ class DbgMuxPeer:
# inconvinient from the API point of view, so we calculate the FCS manually:
frame = DbgMuxFrame.Frame.build(c)[:-2] # strip b'\x00\x00'
c['FCS'] = DbgMuxFrame.fcs_func(frame)
frame += Int16ul.build(c['FCS'])
log.debug('Tx frame (Ns=%03u, Nr=%03u, fcs=0x%04x) %s %s',
c['TxCount'], c['RxCount'], c['FCS'],
c['MsgType'], c['MsgData'].hex())
self.io.write(frame + Int16ul.build(c['FCS']))
self.io.write(frame)
if self.dump is not None:
record = dict(dir='Tx', data=frame)
self.dump.write(record)
# ACK is not getting accounted
if msg_type != DbgMuxFrame.MsgType.Ack:
@ -73,6 +82,10 @@ class DbgMuxPeer:
length: int = Int16ul.parse(frame[2:])
frame += self.io.read(length) # Rest
if self.dump is not None:
record = dict(dir='Rx', data=frame)
self.dump.write(record)
c = DbgMuxFrame.Frame.parse(frame)
log.debug('Rx frame (Ns=%03u, Nr=%03u, fcs=0x%04x) %s %s',