sedbgmux/sedbgmux.py

226 lines
8.6 KiB
Python
Executable File

#!/usr/bin/env python3
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2022 Vadim Yanitskiy <axilirator@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import argparse
import cmd2
import enum
import sys
import time
from transport import TransportModem
from proto import DbgMuxFrame
from peer import DbgMuxPeer
from construct import Container
from handlers import DbgMuxFrameHandler
from handlers import DbgMuxConnUdpBridge
from handlers import DbgMuxConnInteractiveTerminal
class CommonFrameHandler(DbgMuxFrameHandler):
''' Handles some common messages '''
def __init__(self, cmd2):
self.cmd2 = cmd2
def _handle_frame(self, frame: Container) -> None:
if frame['MsgType'] == DbgMuxFrame.MsgType.Ident:
log.info("Identified target: '%s', IMEI=%s",
frame['Msg']['Ident'][:-15],
frame['Msg']['Ident'][-15:])
elif frame['MsgType'] == DbgMuxFrame.MsgType.DPAnnounce:
log.info("Data Provider available (DPRef=0x%04x): '%s'",
frame['Msg']['DPRef'], frame['Msg']['Name'])
elif frame['MsgType'] == DbgMuxFrame.MsgType.FlowControl:
log.warning("Rx FlowControl message, which is not yet supported")
elif frame['MsgType'] == DbgMuxFrame.MsgType.ConnEstablished \
and frame['Msg']['ConnRef'] == 0xffff:
log.warning("Connection establishment failed, "
"no such DPRef=0x%04x?", frame['Msg']['DPRef'])
elif frame['MsgType'] == DbgMuxFrame.MsgType.ConnTerminated \
and frame['Msg']['ConnRef'] == 0xffff:
log.warning("Connection termination failed, "
"no such DPRef=0x%04x?", frame['Msg']['DPRef'])
elif frame['MsgType'] == DbgMuxFrame.MsgType.Ack:
raise StopIteration
else: # To be consumed by other handlers
return
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
class PingPongHandler(DbgMuxFrameHandler):
''' Handles DbgMuxFrame.MsgType.Pong '''
def __init__(self):
self.expect_pong: bool = False
def ping(self, payload: str):
log.info('Tx Ping with payload \'%s\'', payload)
self.send(DbgMuxFrame.MsgType.Ping, payload)
self.expect_pong = True
# TODO: start a timer?
def handle_pong(self, payload: str):
if not self.expect_pong:
log.warning('Rx unexpected Pong, sending ACK anyway')
return
log.info('Rx Pong with payload \'%s\'', payload)
self.expect_pong = False
def _handle_frame(self, frame: Container) -> None:
if frame['MsgType'] == DbgMuxFrame.MsgType.Pong:
self.handle_pong(frame['Msg'])
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
class SEDbgMuxApp(cmd2.Cmd):
DESC = 'DebugMux client for [Sony] Ericsson phones and modems'
# Command categories
CATEGORY_CONN = 'Connection management commands'
CATEGORY_DBGMUX = 'DebugMux specific commands'
def __init__(self, argv):
super().__init__(allow_cli_args=False)
self.intro = cmd2.style('Welcome to %s!' % self.DESC, fg=cmd2.fg.red)
self.prompt = 'DebugMux (\'%s\')> ' % argv.serial_port
self.default_category = 'Built-in commands'
self.argv = argv
# Init the transport layer and DebugMux peer
self.transport = TransportModem(self.argv)
self.peer = DbgMuxPeer(self.transport)
# Register DebugMux frame handlers
self.peer.disp.register(CommonFrameHandler(self), 'Common')
self.peer.disp.register(PingPongHandler(), 'PingPong')
# Modem connection state
self.set_connected(False)
def set_connected(self, state: bool) -> None:
self.connected: bool = state
if self.connected:
self.enable_category(self.CATEGORY_DBGMUX)
else:
msg = 'You must be connected to use this command'
self.disable_category(self.CATEGORY_DBGMUX, msg)
@cmd2.with_category(CATEGORY_CONN)
def do_connect(self, opts) -> None:
''' Connect to the modem and switch it to DebugMux mode '''
self.transport.connect()
self.peer.start()
self.set_connected(True)
@cmd2.with_category(CATEGORY_CONN)
def do_disconnect(self, opts) -> None:
''' Disconnect from the modem '''
self.peer.stop()
self.transport.disconnect()
self.set_connected(False)
@cmd2.with_category(CATEGORY_CONN)
def do_status(self, opts) -> None:
''' Print connection info and statistics '''
if not self.connected:
self.poutput('Not connected')
return
self.poutput('Connected to \'%s\'' % self.argv.serial_port)
self.poutput('Baudrate: %d' % self.argv.serial_baudrate)
self.poutput('TxCount (Ns): %d' % self.peer.tx_count)
self.poutput('RxCount (Nr): %d' % self.peer.rx_count)
@cmd2.with_category(CATEGORY_DBGMUX)
def do_enquiry(self, opts) -> None:
''' Enquiry target identifier and available Data Providers '''
self.peer.send(DbgMuxFrame.MsgType.Enquiry)
# The responce to be handled by CommonFrameHandler
time.sleep(0.5)
ping_parser = cmd2.Cmd2ArgumentParser()
ping_parser.add_argument('-p', '--payload',
type=str, default='Knock, knock!',
help='Ping payload')
@cmd2.with_argparser(ping_parser)
@cmd2.with_category(CATEGORY_DBGMUX)
def do_ping(self, opts) -> None:
''' Send a Ping to the target, expect Pong '''
hinst = self.peer.disp.find_by_name('PingPong')
hinst.ping(opts.payload)
establish_parser = cmd2.Cmd2ArgumentParser()
establish_parser.add_argument('DPRef',
type=lambda v: int(v, 16),
help='DPRef of a Data Provider in hex')
establish_parser.add_argument('mode',
choices=['interactive', 'udp-bridge'],
help='Connection mode')
@cmd2.with_argparser(establish_parser)
@cmd2.with_category(CATEGORY_DBGMUX)
def do_establish(self, opts) -> None:
''' Establish connection with a Data Provider '''
if opts.mode == 'interactive':
ch = DbgMuxConnInteractiveTerminal()
self.peer.disp.register(ch)
ch.establish(opts.DPRef)
ch.attach()
elif opts.mode == 'udp-bridge':
ch = DbgMuxConnUdpBridge()
self.peer.disp.register(ch)
ch.establish(opts.DPRef)
terminate_parser = cmd2.Cmd2ArgumentParser()
terminate_parser.add_argument('ConnRef',
type=lambda v: int(v, 16),
help='ConnRef in hex')
@cmd2.with_argparser(terminate_parser)
@cmd2.with_category(CATEGORY_DBGMUX)
def do_terminate(self, opts) -> None:
''' Terminate connection with a Data Provider '''
msg = dict(ConnRef=opts.ConnRef)
self.peer.send(DbgMuxFrame.MsgType.ConnTerminate, msg)
ap = argparse.ArgumentParser(prog='sedbgmux', description=SEDbgMuxApp.DESC,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
group = ap.add_argument_group('Connection parameters')
group.add_argument('-p', '--serial-port', metavar='PORT', type=str, default='/dev/ttyACM0',
help='Serial port path (default %(default)s)')
group.add_argument('--serial-baudrate', metavar='BAUDRATE', type=int, default=115200,
help='Serial port speed (default %(default)s)')
group.add_argument('--serial-timeout', metavar='TIMEOUT', type=float, default=0.5,
help='Serial port read timeout (default %(default)s)')
log.basicConfig(
format='[%(levelname)s] %(filename)s:%(lineno)d %(message)s', level=log.INFO)
if __name__ == '__main__':
argv = ap.parse_args()
app = SEDbgMuxApp(argv)
sys.exit(app.cmdloop())