Compare commits

...

6 Commits

3 changed files with 408 additions and 52 deletions

191
handlers.py Normal file
View File

@ -0,0 +1,191 @@
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2022 Vadim Yanitskiy <axilirator@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
import socket
import queue
import enum
import abc
import sys
from typing import Any, Optional
from construct import Container
from proto import DbgMuxFrame
class DbgMuxFrameHandler(abc.ABC):
''' Abstract DebugMux frame handler '''
_tx_queue: Optional[queue.Queue] = None
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b''):
''' Called by child classes to send some message '''
assert self._tx_queue is not None
self._tx_queue.put((msg_type, msg))
@abc.abstractmethod
def _handle_frame(self, frame: Container) -> None:
''' Handle the given DebugMux frame '''
class ConnState(enum.Enum):
''' Connection state for DbgMuxConnHandler '''
NotEstablished = enum.auto()
Establishing = enum.auto()
Established = enum.auto()
class DbgMuxConnHandler(DbgMuxFrameHandler):
''' Abstract DebugMux connection handler '''
def __init__(self):
self.conn_state: ConnState = ConnState.NotEstablished
self.ConnRef: int = 0xffff
self.DPRef: int = 0xffff
def establish(self, DPRef: int) -> None:
assert self.conn_state == ConnState.NotEstablished
log.info("Establishing connection with DPRef=0x%04x", DPRef)
self.send(DbgMuxFrame.MsgType.ConnEstablish, dict(DPRef=DPRef))
self.conn_state = ConnState.Establishing
self.DPRef = DPRef
def send_data(self, data: bytes) -> None:
''' Called by child classes to send connection data '''
assert self.conn_state == ConnState.Established
msg = dict(ConnRef=self.ConnRef, Data=data)
self.send(DbgMuxFrame.MsgType.ConnData, msg)
def _match(self, frame: Container,
msg_type: DbgMuxFrame.MsgType,
msg_fields: dict = { }) -> bool:
if frame['MsgType'] != msg_type:
return False
for (key, val) in msg_fields.items():
if key not in frame['Msg']:
return False
if frame['Msg'][key] != val:
return False
return True
def _handle_frame(self, frame: Container) -> None:
''' Handle the given DebugMux frame '''
if self.conn_state == ConnState.Established:
fields = dict(ConnRef=self.ConnRef)
if self._match(frame, DbgMuxFrame.MsgType.ConnData, fields):
self._handle_data(frame['Msg']['Data'])
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
elif self._match(frame, DbgMuxFrame.MsgType.ConnTerminated, fields):
log.info('Connection terminated (DPRef=0x%04x, ConnRef=0x%04x)',
self.DPRef, self.ConnRef)
self.conn_state = ConnState.NotEstablished
self._handle_terminate()
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
elif self.conn_state == ConnState.Establishing:
# Match ConnEstablished with our DPRef and any ConnRef
if self._match(frame, DbgMuxFrame.MsgType.ConnEstablished, dict(DPRef=self.DPRef)):
log.info('Connection established (DPRef=0x%04x, ConnRef=0x%04x)',
self.DPRef, frame['Msg']['ConnRef'])
self.conn_state = ConnState.Established
self.ConnRef = frame['Msg']['ConnRef']
self._handle_establish()
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
@abc.abstractmethod
def _handle_data(self, data: bytes) -> None:
''' Called on reciept of connection data '''
@abc.abstractmethod
def _handle_establish(self) -> None:
''' Called on connection establishment '''
@abc.abstractmethod
def _handle_terminate(self) -> None:
''' Called on connection termination '''
class DbgMuxConnInteractiveTerminal(DbgMuxConnHandler):
def __init__(self, *args, **kw):
self.attached: bool = False
super().__init__(*args)
def attach(self):
self.attached = True
while True:
try:
line = input()
if self.conn_state == ConnState.Established:
self.send_data(bytes(line, 'ascii'))
except (KeyboardInterrupt, EOFError) as e:
break
self.attached = False
def _handle_data(self, data: bytes) -> None:
if self.attached:
sys.stdout.write(data.decode('ascii'))
def _handle_establish(self) -> None:
pass
def _handle_terminate(self) -> None:
pass
class DbgMuxConnUdpBridge(DbgMuxConnHandler):
DGRAM_MAX_LEN: int = 1024
def __init__(self, *args, **kw):
super().__init__(*args)
self.raddr: str = kw.get('raddr', '127.0.0.1')
self.rport: int = kw.get('rport', 9999)
self.laddr: str = kw.get('laddr', '127.0.0.1')
self.lport: int = kw.get('lport', 8888)
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind((self.laddr, self.lport))
self._sock.settimeout(0.5)
self._thread = threading.Thread(target=self._worker,
daemon=True)
self._shutdown = threading.Event()
def _worker(self) -> None:
while not self._shutdown.is_set():
try:
(data, addr) = self._sock.recvfrom(self.DGRAM_MAX_LEN)
if self.conn_state == ConnState.Established:
self.send_data(data)
except TimeoutError:
pass
def _handle_data(self, data: bytes) -> None:
self._sock.sendto(data, (self.raddr, self.rport))
def _handle_establish(self) -> None:
self._shutdown.clear()
self._thread.start()
def _handle_terminate(self) -> None:
self._shutdown.set()

123
peer.py
View File

@ -17,21 +17,138 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log import logging as log
import threading
import queue
from typing import Any, Optional from typing import Any, Optional
from construct import Const, Container, Int16ul from construct import Const, Container, Int16ul
from handlers import DbgMuxFrameHandler
from transport import Transport from transport import Transport
from proto import DbgMuxFrame from proto import DbgMuxFrame
class DbgMuxFrameDisp:
''' DebugMux frame dispatcher '''
def __init__(self, rxq: queue.Queue, txq: queue.Queue):
self.__hlist = list()
self._rx_queue = rxq
self._tx_queue = txq
self._thread = threading.Thread(target=self._worker,
name='DbgMuxPeer-Disp',
daemon=True)
self._shutdown = threading.Event()
def start(self) -> None:
self._shutdown.clear()
self._thread.start()
def stop(self) -> None:
self._shutdown.set()
self._thread.join()
def find_by_name(self, name: str) -> Optional[DbgMuxFrameHandler]:
for (hname, hinst) in self.__hlist:
if hname == name:
return hinst
return None
def register(self, inst: DbgMuxFrameHandler,
name: Optional[str] = None) -> None:
if name is not None and self.find_by_name(name):
raise FileExistsError
self.__hlist.append((name, inst))
# Give a handler access to the Tx queue
inst._tx_queue = self._tx_queue
def unregister(self, name: str):
inst = self.find_by_name(name)
if inst is None:
raise FileNotFoundError
self.__hlist.remove((name, inst))
def _worker(self) -> None:
while not self._shutdown.is_set():
try:
frame = self._rx_queue.get(block=True, timeout=0.5)
self._dispatch(frame)
except queue.Empty:
pass
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def _dispatch(self, frame: DbgMuxFrame) -> None:
for (hname, hinst) in self.__hlist:
try:
hinst._handle_frame(frame)
except StopIteration:
return
except Exception as e:
log.error("Handler '%s' raised an exception: %s", hname, e)
# TODO: remove this handler?
log.error("DebugMux message %s was not handled, dropping...", frame['Msg'])
class DbgMuxPeer: class DbgMuxPeer:
def __init__(self, io: Transport): def __init__(self, io: Transport):
self.tx_count: int = 0 self.tx_count: int = 0
self.rx_count: int = 0 self.rx_count: int = 0
self.io = io self.io = io
# Threads handling Rx/Tx frames
self._rx_thread = threading.Thread(target=self._rx_worker,
name='DbgMuxPeer-Rx',
daemon=True)
self._tx_thread = threading.Thread(target=self._tx_worker,
name='DbgMuxPeer-Tx',
daemon=True)
self._shutdown = threading.Event()
# Internal queues for Rx/Tx frames
self._rx_queue = queue.Queue()
self._tx_queue = queue.Queue()
# Init frame dispatcher
self.disp = DbgMuxFrameDisp(self._rx_queue,
self._tx_queue)
def start(self) -> None:
self._shutdown.clear()
self._rx_thread.start()
self._tx_thread.start()
self.disp.start()
def stop(self) -> None:
# Set the shutdown event
self._shutdown.set()
# Wait for both threads to terminate
self._tx_thread.join()
self._rx_thread.join()
self.disp.stop()
def _rx_worker(self) -> None:
while not self._shutdown.is_set():
frame = self._recv() # blocking until timeout
if frame is not None:
self._rx_queue.put(frame)
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def _tx_worker(self) -> None:
while not self._shutdown.is_set():
try:
(msg_type, msg) = self._tx_queue.get(block=True, timeout=0.5)
self._send(msg_type, msg)
self._tx_queue.task_done()
except queue.Empty:
pass
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None: def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
''' Send a single message (non-blocking call) '''
self._tx_queue.put((msg_type, msg))
def _send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
# Encode the inner message first # Encode the inner message first
msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type) msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type)
@ -63,7 +180,11 @@ class DbgMuxPeer:
if msg_type != DbgMuxFrame.MsgType.Ack: if msg_type != DbgMuxFrame.MsgType.Ack:
self.tx_count += 1 self.tx_count += 1
def recv(self) -> Optional[Container]: def recv(self, timeout: Optional[float] = None) -> Container:
''' Receive a single message (blocking call) '''
return self._rx_queue.get(block=True, timeout=timeout)
def _recv(self) -> Optional[Container]:
frame: bytes = b'' frame: bytes = b''
frame += self.io.read(2) # Magic frame += self.io.read(2) # Magic
if frame == b'': if frame == b'':

View File

@ -23,11 +23,74 @@ import argparse
import cmd2 import cmd2
import enum import enum
import sys import sys
import time
from transport import TransportModem from transport import TransportModem
from proto import DbgMuxFrame from proto import DbgMuxFrame
from peer import DbgMuxPeer from peer import DbgMuxPeer
from construct import Container
from handlers import DbgMuxFrameHandler
from handlers import DbgMuxConnUdpBridge
from handlers import DbgMuxConnInteractiveTerminal
class CommonFrameHandler(DbgMuxFrameHandler):
''' Handles some common messages '''
def __init__(self, cmd2):
self.cmd2 = cmd2
def _handle_frame(self, frame: Container) -> None:
if frame['MsgType'] == DbgMuxFrame.MsgType.Ident:
log.info("Identified target: '%s', IMEI=%s",
frame['Msg']['Ident'][:-15],
frame['Msg']['Ident'][-15:])
elif frame['MsgType'] == DbgMuxFrame.MsgType.DPAnnounce:
log.info("Data Provider available (DPRef=0x%04x): '%s'",
frame['Msg']['DPRef'], frame['Msg']['Name'])
elif frame['MsgType'] == DbgMuxFrame.MsgType.FlowControl:
log.warning("Rx FlowControl message, which is not yet supported")
elif frame['MsgType'] == DbgMuxFrame.MsgType.ConnEstablished \
and frame['Msg']['ConnRef'] == 0xffff:
log.warning("Connection establishment failed, "
"no such DPRef=0x%04x?", frame['Msg']['DPRef'])
elif frame['MsgType'] == DbgMuxFrame.MsgType.ConnTerminated \
and frame['Msg']['ConnRef'] == 0xffff:
log.warning("Connection termination failed, "
"no such DPRef=0x%04x?", frame['Msg']['DPRef'])
elif frame['MsgType'] == DbgMuxFrame.MsgType.Ack:
raise StopIteration
else: # To be consumed by other handlers
return
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
class PingPongHandler(DbgMuxFrameHandler):
''' Handles DbgMuxFrame.MsgType.Pong '''
def __init__(self):
self.expect_pong: bool = False
def ping(self, payload: str):
log.info('Tx Ping with payload \'%s\'', payload)
self.send(DbgMuxFrame.MsgType.Ping, payload)
self.expect_pong = True
# TODO: start a timer?
def handle_pong(self, payload: str):
if not self.expect_pong:
log.warning('Rx unexpected Pong, sending ACK anyway')
return
log.info('Rx Pong with payload \'%s\'', payload)
self.expect_pong = False
def _handle_frame(self, frame: Container) -> None:
if frame['MsgType'] == DbgMuxFrame.MsgType.Pong:
self.handle_pong(frame['Msg'])
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
class SEDbgMuxApp(cmd2.Cmd): class SEDbgMuxApp(cmd2.Cmd):
DESC = 'DebugMux client for [Sony] Ericsson phones and modems' DESC = 'DebugMux client for [Sony] Ericsson phones and modems'
@ -48,6 +111,10 @@ class SEDbgMuxApp(cmd2.Cmd):
self.transport = TransportModem(self.argv) self.transport = TransportModem(self.argv)
self.peer = DbgMuxPeer(self.transport) self.peer = DbgMuxPeer(self.transport)
# Register DebugMux frame handlers
self.peer.disp.register(CommonFrameHandler(self), 'Common')
self.peer.disp.register(PingPongHandler(), 'PingPong')
# Modem connection state # Modem connection state
self.set_connected(False) self.set_connected(False)
@ -63,11 +130,13 @@ class SEDbgMuxApp(cmd2.Cmd):
def do_connect(self, opts) -> None: def do_connect(self, opts) -> None:
''' Connect to the modem and switch it to DebugMux mode ''' ''' Connect to the modem and switch it to DebugMux mode '''
self.transport.connect() self.transport.connect()
self.peer.start()
self.set_connected(True) self.set_connected(True)
@cmd2.with_category(CATEGORY_CONN) @cmd2.with_category(CATEGORY_CONN)
def do_disconnect(self, opts) -> None: def do_disconnect(self, opts) -> None:
''' Disconnect from the modem ''' ''' Disconnect from the modem '''
self.peer.stop()
self.transport.disconnect() self.transport.disconnect()
self.set_connected(False) self.set_connected(False)
@ -86,23 +155,8 @@ class SEDbgMuxApp(cmd2.Cmd):
def do_enquiry(self, opts) -> None: def do_enquiry(self, opts) -> None:
''' Enquiry target identifier and available Data Providers ''' ''' Enquiry target identifier and available Data Providers '''
self.peer.send(DbgMuxFrame.MsgType.Enquiry) self.peer.send(DbgMuxFrame.MsgType.Enquiry)
while True: # The responce to be handled by CommonFrameHandler
f = self.peer.recv() time.sleep(0.5)
if f['MsgType'] == DbgMuxFrame.MsgType.Ident:
log.info("Identified target: '%s', IMEI=%s",
f['Msg']['Ident'][:-15],
f['Msg']['Ident'][-15:])
elif f['MsgType'] == DbgMuxFrame.MsgType.DPAnnounce:
log.info("Data Provider available (DPRef=0x%04x): '%s'",
f['Msg']['DPRef'], f['Msg']['Name'])
# No more data in the buffer
# FIXME: layer violation!
if self.transport._sl.in_waiting == 0:
break
# ACKnowledge reception of the info
self.peer.send(DbgMuxFrame.MsgType.Ack)
ping_parser = cmd2.Cmd2ArgumentParser() ping_parser = cmd2.Cmd2ArgumentParser()
ping_parser.add_argument('-p', '--payload', ping_parser.add_argument('-p', '--payload',
@ -113,52 +167,42 @@ class SEDbgMuxApp(cmd2.Cmd):
@cmd2.with_category(CATEGORY_DBGMUX) @cmd2.with_category(CATEGORY_DBGMUX)
def do_ping(self, opts) -> None: def do_ping(self, opts) -> None:
''' Send a Ping to the target, expect Pong ''' ''' Send a Ping to the target, expect Pong '''
log.info('Tx Ping with payload \'%s\'', opts.payload) hinst = self.peer.disp.find_by_name('PingPong')
self.peer.send(DbgMuxFrame.MsgType.Ping, opts.payload) hinst.ping(opts.payload)
f = self.peer.recv()
assert f['MsgType'] == DbgMuxFrame.MsgType.Pong
log.info('Rx Pong with payload \'%s\'', f['Msg'])
self.peer.send(DbgMuxFrame.MsgType.Ack)
establish_parser = cmd2.Cmd2ArgumentParser() establish_parser = cmd2.Cmd2ArgumentParser()
establish_parser.add_argument('DPRef', establish_parser.add_argument('DPRef',
type=lambda v: int(v, 16), type=lambda v: int(v, 16),
help='DPRef of a Data Provider in hex') help='DPRef of a Data Provider in hex')
establish_parser.add_argument('mode',
choices=['interactive', 'udp-bridge'],
help='Connection mode')
@cmd2.with_argparser(establish_parser) @cmd2.with_argparser(establish_parser)
@cmd2.with_category(CATEGORY_DBGMUX) @cmd2.with_category(CATEGORY_DBGMUX)
def do_establish(self, opts) -> None: def do_establish(self, opts) -> None:
''' Establish connection with a Data Provider ''' ''' Establish connection with a Data Provider '''
log.info("Establishing connection with DPRef=0x%04x", opts.DPRef) if opts.mode == 'interactive':
self.peer.send(DbgMuxFrame.MsgType.ConnEstablish, ch = DbgMuxConnInteractiveTerminal()
dict(DPRef=opts.DPRef)) self.peer.disp.register(ch)
ch.establish(opts.DPRef)
ch.attach()
elif opts.mode == 'udp-bridge':
ch = DbgMuxConnUdpBridge()
self.peer.disp.register(ch)
ch.establish(opts.DPRef)
f = self.peer.recv() terminate_parser = cmd2.Cmd2ArgumentParser()
assert f['MsgType'] == DbgMuxFrame.MsgType.ConnEstablished terminate_parser.add_argument('ConnRef',
if f['Msg']['ConnRef'] == 0xffff: type=lambda v: int(v, 16),
log.warning("Connection failed: unknown DPRef=0x%04x?", opts.DPRef) help='ConnRef in hex')
self.peer.send(DbgMuxFrame.MsgType.Ack)
return
log.info("Connection established (ConnRef=0x%04x)", @cmd2.with_argparser(terminate_parser)
f['Msg']['ConnRef']) @cmd2.with_category(CATEGORY_DBGMUX)
def do_terminate(self, opts) -> None:
# Read the messages ''' Terminate connection with a Data Provider '''
while True: msg = dict(ConnRef=opts.ConnRef)
f = self.peer.recv() self.peer.send(DbgMuxFrame.MsgType.ConnTerminate, msg)
if f['MsgType'] != DbgMuxFrame.MsgType.ConnData:
log.warning('Unexpected frame: %s', f)
self.peer.send(DbgMuxFrame.MsgType.Ack)
continue
try: # FIXME: there can be binary data
self.stdout.write(f['Msg']['Data'].decode())
except: # ... ignore it for now
continue
# ACKnowledge reception of a frame
self.peer.send(DbgMuxFrame.MsgType.Ack)
ap = argparse.ArgumentParser(prog='sedbgmux', description=SEDbgMuxApp.DESC, ap = argparse.ArgumentParser(prog='sedbgmux', description=SEDbgMuxApp.DESC,