sedbgmux/sedbgmux/io/dump_native.py

71 lines
2.4 KiB
Python

#!/usr/bin/env python3
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2023 Vadim Yanitskiy <fixeria@osmocom.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import time
from construct import *
from . import DumpIO
from . import DumpIOError
from . import DumpIOEndOfFile
# local logger for this module
log = logging.getLogger(__name__)
class DumpIONative(DumpIO):
''' Native dump format for this package '''
DumpRecord = Struct(
'Tag' / Const(b'\x01'),
'Length' / Rebuild(Int16ul, lambda ctx: len(ctx.Data) + 5),
'Direction' / Enum(subcon=Int8ul, Rx=0x00, Tx=0x01),
'Timestamp' / Default(Float64l, lambda ctx: time.time()),
'Data' / Bytes(lambda ctx: ctx.Length - 5),
)
def __init__(self, fname: str, readonly: bool = False) -> None:
log.info('Opening dump file %s (%s mode)',
fname, 'readonly' if readonly else 'write')
mode: str = 'rb' if readonly else 'ab'
self._file = open(fname, mode)
self.readonly = readonly
def read(self) -> dict:
''' Read a single record from dump '''
try:
c = self.DumpRecord.parse_stream(self._file)
except StreamError as e:
raise DumpIOEndOfFile from e
return dict(dir=str(c['Direction']),
timestamp=c['Timestamp'],
data=c['Data'])
def write(self, record: dict) -> None:
''' Store a single record to dump '''
if self.readonly:
raise DumpIOError('Read-only mode')
c = Container(Direction=record['dir'], Data=record['data'])
if 'timestamp' in record:
c['Timestamp'] = record['timestamp']
dr: bytes = self.DumpRecord.build(c)
self._file.write(dr)