#!/usr/bin/env python3 # This file is a part of sedbgmux, an open source DebugMux client. # Copyright (c) 2022 Vadim Yanitskiy # # SPDX-License-Identifier: GPL-3.0-or-later # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging as log import argparse import cmd2 import enum import sys import time from transport import TransportModem from proto import DbgMuxFrame from peer import DbgMuxPeer from construct import Container from handlers import DbgMuxFrameHandler from handlers import DbgMuxConnUdpBridge from handlers import DbgMuxConnInteractiveTerminal class CommonFrameHandler(DbgMuxFrameHandler): ''' Handles some common messages ''' def __init__(self, cmd2): self.cmd2 = cmd2 def _handle_frame(self, frame: Container) -> None: if frame['MsgType'] == DbgMuxFrame.MsgType.Ident: log.info("Identified target: '%s', IMEI=%s", frame['Msg']['Ident'][:-15], frame['Msg']['Ident'][-15:]) elif frame['MsgType'] == DbgMuxFrame.MsgType.DPAnnounce: log.info("Data Provider available (DPRef=0x%04x): '%s'", frame['Msg']['DPRef'], frame['Msg']['Name']) elif frame['MsgType'] == DbgMuxFrame.MsgType.FlowControl: log.warning("Rx FlowControl message, which is not yet supported") elif frame['MsgType'] == DbgMuxFrame.MsgType.ConnEstablished \ and frame['Msg']['ConnRef'] == 0xffff: log.warning("Connection establishment failed, " "no such DPRef=0x%04x?", frame['Msg']['DPRef']) elif frame['MsgType'] == DbgMuxFrame.MsgType.ConnTerminated \ and frame['Msg']['ConnRef'] == 0xffff: log.warning("Connection termination failed, " "no such DPRef=0x%04x?", frame['Msg']['DPRef']) elif frame['MsgType'] == DbgMuxFrame.MsgType.Ack: raise StopIteration else: # To be consumed by other handlers return self.send(DbgMuxFrame.MsgType.Ack) raise StopIteration class PingPongHandler(DbgMuxFrameHandler): ''' Handles DbgMuxFrame.MsgType.Pong ''' def __init__(self): self.expect_pong: bool = False def ping(self, payload: str): log.info('Tx Ping with payload \'%s\'', payload) self.send(DbgMuxFrame.MsgType.Ping, payload) self.expect_pong = True # TODO: start a timer? def handle_pong(self, payload: str): if not self.expect_pong: log.warning('Rx unexpected Pong, sending ACK anyway') return log.info('Rx Pong with payload \'%s\'', payload) self.expect_pong = False def _handle_frame(self, frame: Container) -> None: if frame['MsgType'] == DbgMuxFrame.MsgType.Pong: self.handle_pong(frame['Msg']) self.send(DbgMuxFrame.MsgType.Ack) raise StopIteration class SEDbgMuxApp(cmd2.Cmd): DESC = 'DebugMux client for [Sony] Ericsson phones and modems' # Command categories CATEGORY_CONN = 'Connection management commands' CATEGORY_DBGMUX = 'DebugMux specific commands' def __init__(self, argv): super().__init__(allow_cli_args=False) self.intro = cmd2.style('Welcome to %s!' % self.DESC, fg=cmd2.fg.red) self.prompt = 'DebugMux (\'%s\')> ' % argv.serial_port self.default_category = 'Built-in commands' self.argv = argv # Init the transport layer and DebugMux peer self.transport = TransportModem(self.argv) self.peer = DbgMuxPeer(self.transport) # Register DebugMux frame handlers self.peer.disp.register(CommonFrameHandler(self), 'Common') self.peer.disp.register(PingPongHandler(), 'PingPong') # Modem connection state self.set_connected(False) def set_connected(self, state: bool) -> None: self.connected: bool = state if self.connected: self.enable_category(self.CATEGORY_DBGMUX) else: msg = 'You must be connected to use this command' self.disable_category(self.CATEGORY_DBGMUX, msg) @cmd2.with_category(CATEGORY_CONN) def do_connect(self, opts) -> None: ''' Connect to the modem and switch it to DebugMux mode ''' self.transport.connect() self.peer.start() self.set_connected(True) @cmd2.with_category(CATEGORY_CONN) def do_disconnect(self, opts) -> None: ''' Disconnect from the modem ''' self.peer.stop() self.transport.disconnect() self.set_connected(False) @cmd2.with_category(CATEGORY_CONN) def do_status(self, opts) -> None: ''' Print connection info and statistics ''' if not self.connected: self.poutput('Not connected') return self.poutput('Connected to \'%s\'' % self.argv.serial_port) self.poutput('Baudrate: %d' % self.argv.serial_baudrate) self.poutput('TxCount (Ns): %d' % self.peer.tx_count) self.poutput('RxCount (Nr): %d' % self.peer.rx_count) @cmd2.with_category(CATEGORY_DBGMUX) def do_enquiry(self, opts) -> None: ''' Enquiry target identifier and available Data Providers ''' self.peer.send(DbgMuxFrame.MsgType.Enquiry) # The responce to be handled by CommonFrameHandler time.sleep(0.5) ping_parser = cmd2.Cmd2ArgumentParser() ping_parser.add_argument('-p', '--payload', type=str, default='Knock, knock!', help='Ping payload') @cmd2.with_argparser(ping_parser) @cmd2.with_category(CATEGORY_DBGMUX) def do_ping(self, opts) -> None: ''' Send a Ping to the target, expect Pong ''' hinst = self.peer.disp.find_by_name('PingPong') hinst.ping(opts.payload) establish_parser = cmd2.Cmd2ArgumentParser() establish_parser.add_argument('DPRef', type=lambda v: int(v, 16), help='DPRef of a Data Provider in hex') establish_parser.add_argument('mode', choices=['interactive', 'udp-bridge'], help='Connection mode') @cmd2.with_argparser(establish_parser) @cmd2.with_category(CATEGORY_DBGMUX) def do_establish(self, opts) -> None: ''' Establish connection with a Data Provider ''' if opts.mode == 'interactive': ch = DbgMuxConnInteractiveTerminal() self.peer.disp.register(ch) ch.establish(opts.DPRef) ch.attach() elif opts.mode == 'udp-bridge': ch = DbgMuxConnUdpBridge() self.peer.disp.register(ch) ch.establish(opts.DPRef) ap = argparse.ArgumentParser(prog='sedbgmux', description=SEDbgMuxApp.DESC, formatter_class=argparse.ArgumentDefaultsHelpFormatter) group = ap.add_argument_group('Connection parameters') group.add_argument('-p', '--serial-port', metavar='PORT', type=str, default='/dev/ttyACM0', help='Serial port path (default %(default)s)') group.add_argument('--serial-baudrate', metavar='BAUDRATE', type=int, default=115200, help='Serial port speed (default %(default)s)') group.add_argument('--serial-timeout', metavar='TIMEOUT', type=float, default=0.5, help='Serial port read timeout (default %(default)s)') log.basicConfig( format='[%(levelname)s] %(filename)s:%(lineno)d %(message)s', level=log.INFO) if __name__ == '__main__': argv = ap.parse_args() app = SEDbgMuxApp(argv) sys.exit(app.cmdloop())