Compare commits

...

6 Commits

3 changed files with 408 additions and 52 deletions

191
handlers.py Normal file
View File

@ -0,0 +1,191 @@
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2022 Vadim Yanitskiy <axilirator@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
import socket
import queue
import enum
import abc
import sys
from typing import Any, Optional
from construct import Container
from proto import DbgMuxFrame
class DbgMuxFrameHandler(abc.ABC):
''' Abstract DebugMux frame handler '''
_tx_queue: Optional[queue.Queue] = None
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b''):
''' Called by child classes to send some message '''
assert self._tx_queue is not None
self._tx_queue.put((msg_type, msg))
@abc.abstractmethod
def _handle_frame(self, frame: Container) -> None:
''' Handle the given DebugMux frame '''
class ConnState(enum.Enum):
''' Connection state for DbgMuxConnHandler '''
NotEstablished = enum.auto()
Establishing = enum.auto()
Established = enum.auto()
class DbgMuxConnHandler(DbgMuxFrameHandler):
''' Abstract DebugMux connection handler '''
def __init__(self):
self.conn_state: ConnState = ConnState.NotEstablished
self.ConnRef: int = 0xffff
self.DPRef: int = 0xffff
def establish(self, DPRef: int) -> None:
assert self.conn_state == ConnState.NotEstablished
log.info("Establishing connection with DPRef=0x%04x", DPRef)
self.send(DbgMuxFrame.MsgType.ConnEstablish, dict(DPRef=DPRef))
self.conn_state = ConnState.Establishing
self.DPRef = DPRef
def send_data(self, data: bytes) -> None:
''' Called by child classes to send connection data '''
assert self.conn_state == ConnState.Established
msg = dict(ConnRef=self.ConnRef, Data=data)
self.send(DbgMuxFrame.MsgType.ConnData, msg)
def _match(self, frame: Container,
msg_type: DbgMuxFrame.MsgType,
msg_fields: dict = { }) -> bool:
if frame['MsgType'] != msg_type:
return False
for (key, val) in msg_fields.items():
if key not in frame['Msg']:
return False
if frame['Msg'][key] != val:
return False
return True
def _handle_frame(self, frame: Container) -> None:
''' Handle the given DebugMux frame '''
if self.conn_state == ConnState.Established:
fields = dict(ConnRef=self.ConnRef)
if self._match(frame, DbgMuxFrame.MsgType.ConnData, fields):
self._handle_data(frame['Msg']['Data'])
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
elif self._match(frame, DbgMuxFrame.MsgType.ConnTerminated, fields):
log.info('Connection terminated (DPRef=0x%04x, ConnRef=0x%04x)',
self.DPRef, self.ConnRef)
self.conn_state = ConnState.NotEstablished
self._handle_terminate()
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
elif self.conn_state == ConnState.Establishing:
# Match ConnEstablished with our DPRef and any ConnRef
if self._match(frame, DbgMuxFrame.MsgType.ConnEstablished, dict(DPRef=self.DPRef)):
log.info('Connection established (DPRef=0x%04x, ConnRef=0x%04x)',
self.DPRef, frame['Msg']['ConnRef'])
self.conn_state = ConnState.Established
self.ConnRef = frame['Msg']['ConnRef']
self._handle_establish()
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
@abc.abstractmethod
def _handle_data(self, data: bytes) -> None:
''' Called on reciept of connection data '''
@abc.abstractmethod
def _handle_establish(self) -> None:
''' Called on connection establishment '''
@abc.abstractmethod
def _handle_terminate(self) -> None:
''' Called on connection termination '''
class DbgMuxConnInteractiveTerminal(DbgMuxConnHandler):
def __init__(self, *args, **kw):
self.attached: bool = False
super().__init__(*args)
def attach(self):
self.attached = True
while True:
try:
line = input()
if self.conn_state == ConnState.Established:
self.send_data(bytes(line, 'ascii'))
except (KeyboardInterrupt, EOFError) as e:
break
self.attached = False
def _handle_data(self, data: bytes) -> None:
if self.attached:
sys.stdout.write(data.decode('ascii'))
def _handle_establish(self) -> None:
pass
def _handle_terminate(self) -> None:
pass
class DbgMuxConnUdpBridge(DbgMuxConnHandler):
DGRAM_MAX_LEN: int = 1024
def __init__(self, *args, **kw):
super().__init__(*args)
self.raddr: str = kw.get('raddr', '127.0.0.1')
self.rport: int = kw.get('rport', 9999)
self.laddr: str = kw.get('laddr', '127.0.0.1')
self.lport: int = kw.get('lport', 8888)
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind((self.laddr, self.lport))
self._sock.settimeout(0.5)
self._thread = threading.Thread(target=self._worker,
daemon=True)
self._shutdown = threading.Event()
def _worker(self) -> None:
while not self._shutdown.is_set():
try:
(data, addr) = self._sock.recvfrom(self.DGRAM_MAX_LEN)
if self.conn_state == ConnState.Established:
self.send_data(data)
except TimeoutError:
pass
def _handle_data(self, data: bytes) -> None:
self._sock.sendto(data, (self.raddr, self.rport))
def _handle_establish(self) -> None:
self._shutdown.clear()
self._thread.start()
def _handle_terminate(self) -> None:
self._shutdown.set()

123
peer.py
View File

@ -17,21 +17,138 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
import queue
from typing import Any, Optional
from construct import Const, Container, Int16ul
from handlers import DbgMuxFrameHandler
from transport import Transport
from proto import DbgMuxFrame
class DbgMuxFrameDisp:
''' DebugMux frame dispatcher '''
def __init__(self, rxq: queue.Queue, txq: queue.Queue):
self.__hlist = list()
self._rx_queue = rxq
self._tx_queue = txq
self._thread = threading.Thread(target=self._worker,
name='DbgMuxPeer-Disp',
daemon=True)
self._shutdown = threading.Event()
def start(self) -> None:
self._shutdown.clear()
self._thread.start()
def stop(self) -> None:
self._shutdown.set()
self._thread.join()
def find_by_name(self, name: str) -> Optional[DbgMuxFrameHandler]:
for (hname, hinst) in self.__hlist:
if hname == name:
return hinst
return None
def register(self, inst: DbgMuxFrameHandler,
name: Optional[str] = None) -> None:
if name is not None and self.find_by_name(name):
raise FileExistsError
self.__hlist.append((name, inst))
# Give a handler access to the Tx queue
inst._tx_queue = self._tx_queue
def unregister(self, name: str):
inst = self.find_by_name(name)
if inst is None:
raise FileNotFoundError
self.__hlist.remove((name, inst))
def _worker(self) -> None:
while not self._shutdown.is_set():
try:
frame = self._rx_queue.get(block=True, timeout=0.5)
self._dispatch(frame)
except queue.Empty:
pass
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def _dispatch(self, frame: DbgMuxFrame) -> None:
for (hname, hinst) in self.__hlist:
try:
hinst._handle_frame(frame)
except StopIteration:
return
except Exception as e:
log.error("Handler '%s' raised an exception: %s", hname, e)
# TODO: remove this handler?
log.error("DebugMux message %s was not handled, dropping...", frame['Msg'])
class DbgMuxPeer:
def __init__(self, io: Transport):
self.tx_count: int = 0
self.rx_count: int = 0
self.io = io
# Threads handling Rx/Tx frames
self._rx_thread = threading.Thread(target=self._rx_worker,
name='DbgMuxPeer-Rx',
daemon=True)
self._tx_thread = threading.Thread(target=self._tx_worker,
name='DbgMuxPeer-Tx',
daemon=True)
self._shutdown = threading.Event()
# Internal queues for Rx/Tx frames
self._rx_queue = queue.Queue()
self._tx_queue = queue.Queue()
# Init frame dispatcher
self.disp = DbgMuxFrameDisp(self._rx_queue,
self._tx_queue)
def start(self) -> None:
self._shutdown.clear()
self._rx_thread.start()
self._tx_thread.start()
self.disp.start()
def stop(self) -> None:
# Set the shutdown event
self._shutdown.set()
# Wait for both threads to terminate
self._tx_thread.join()
self._rx_thread.join()
self.disp.stop()
def _rx_worker(self) -> None:
while not self._shutdown.is_set():
frame = self._recv() # blocking until timeout
if frame is not None:
self._rx_queue.put(frame)
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def _tx_worker(self) -> None:
while not self._shutdown.is_set():
try:
(msg_type, msg) = self._tx_queue.get(block=True, timeout=0.5)
self._send(msg_type, msg)
self._tx_queue.task_done()
except queue.Empty:
pass
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
''' Send a single message (non-blocking call) '''
self._tx_queue.put((msg_type, msg))
def _send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
# Encode the inner message first
msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type)
@ -63,7 +180,11 @@ class DbgMuxPeer:
if msg_type != DbgMuxFrame.MsgType.Ack:
self.tx_count += 1
def recv(self) -> Optional[Container]:
def recv(self, timeout: Optional[float] = None) -> Container:
''' Receive a single message (blocking call) '''
return self._rx_queue.get(block=True, timeout=timeout)
def _recv(self) -> Optional[Container]:
frame: bytes = b''
frame += self.io.read(2) # Magic
if frame == b'':

View File

@ -23,11 +23,74 @@ import argparse
import cmd2
import enum
import sys
import time
from transport import TransportModem
from proto import DbgMuxFrame
from peer import DbgMuxPeer
from construct import Container
from handlers import DbgMuxFrameHandler
from handlers import DbgMuxConnUdpBridge
from handlers import DbgMuxConnInteractiveTerminal
class CommonFrameHandler(DbgMuxFrameHandler):
''' Handles some common messages '''
def __init__(self, cmd2):
self.cmd2 = cmd2
def _handle_frame(self, frame: Container) -> None:
if frame['MsgType'] == DbgMuxFrame.MsgType.Ident:
log.info("Identified target: '%s', IMEI=%s",
frame['Msg']['Ident'][:-15],
frame['Msg']['Ident'][-15:])
elif frame['MsgType'] == DbgMuxFrame.MsgType.DPAnnounce:
log.info("Data Provider available (DPRef=0x%04x): '%s'",
frame['Msg']['DPRef'], frame['Msg']['Name'])
elif frame['MsgType'] == DbgMuxFrame.MsgType.FlowControl:
log.warning("Rx FlowControl message, which is not yet supported")
elif frame['MsgType'] == DbgMuxFrame.MsgType.ConnEstablished \
and frame['Msg']['ConnRef'] == 0xffff:
log.warning("Connection establishment failed, "
"no such DPRef=0x%04x?", frame['Msg']['DPRef'])
elif frame['MsgType'] == DbgMuxFrame.MsgType.ConnTerminated \
and frame['Msg']['ConnRef'] == 0xffff:
log.warning("Connection termination failed, "
"no such DPRef=0x%04x?", frame['Msg']['DPRef'])
elif frame['MsgType'] == DbgMuxFrame.MsgType.Ack:
raise StopIteration
else: # To be consumed by other handlers
return
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
class PingPongHandler(DbgMuxFrameHandler):
''' Handles DbgMuxFrame.MsgType.Pong '''
def __init__(self):
self.expect_pong: bool = False
def ping(self, payload: str):
log.info('Tx Ping with payload \'%s\'', payload)
self.send(DbgMuxFrame.MsgType.Ping, payload)
self.expect_pong = True
# TODO: start a timer?
def handle_pong(self, payload: str):
if not self.expect_pong:
log.warning('Rx unexpected Pong, sending ACK anyway')
return
log.info('Rx Pong with payload \'%s\'', payload)
self.expect_pong = False
def _handle_frame(self, frame: Container) -> None:
if frame['MsgType'] == DbgMuxFrame.MsgType.Pong:
self.handle_pong(frame['Msg'])
self.send(DbgMuxFrame.MsgType.Ack)
raise StopIteration
class SEDbgMuxApp(cmd2.Cmd):
DESC = 'DebugMux client for [Sony] Ericsson phones and modems'
@ -48,6 +111,10 @@ class SEDbgMuxApp(cmd2.Cmd):
self.transport = TransportModem(self.argv)
self.peer = DbgMuxPeer(self.transport)
# Register DebugMux frame handlers
self.peer.disp.register(CommonFrameHandler(self), 'Common')
self.peer.disp.register(PingPongHandler(), 'PingPong')
# Modem connection state
self.set_connected(False)
@ -63,11 +130,13 @@ class SEDbgMuxApp(cmd2.Cmd):
def do_connect(self, opts) -> None:
''' Connect to the modem and switch it to DebugMux mode '''
self.transport.connect()
self.peer.start()
self.set_connected(True)
@cmd2.with_category(CATEGORY_CONN)
def do_disconnect(self, opts) -> None:
''' Disconnect from the modem '''
self.peer.stop()
self.transport.disconnect()
self.set_connected(False)
@ -86,23 +155,8 @@ class SEDbgMuxApp(cmd2.Cmd):
def do_enquiry(self, opts) -> None:
''' Enquiry target identifier and available Data Providers '''
self.peer.send(DbgMuxFrame.MsgType.Enquiry)
while True:
f = self.peer.recv()
if f['MsgType'] == DbgMuxFrame.MsgType.Ident:
log.info("Identified target: '%s', IMEI=%s",
f['Msg']['Ident'][:-15],
f['Msg']['Ident'][-15:])
elif f['MsgType'] == DbgMuxFrame.MsgType.DPAnnounce:
log.info("Data Provider available (DPRef=0x%04x): '%s'",
f['Msg']['DPRef'], f['Msg']['Name'])
# No more data in the buffer
# FIXME: layer violation!
if self.transport._sl.in_waiting == 0:
break
# ACKnowledge reception of the info
self.peer.send(DbgMuxFrame.MsgType.Ack)
# The responce to be handled by CommonFrameHandler
time.sleep(0.5)
ping_parser = cmd2.Cmd2ArgumentParser()
ping_parser.add_argument('-p', '--payload',
@ -113,52 +167,42 @@ class SEDbgMuxApp(cmd2.Cmd):
@cmd2.with_category(CATEGORY_DBGMUX)
def do_ping(self, opts) -> None:
''' Send a Ping to the target, expect Pong '''
log.info('Tx Ping with payload \'%s\'', opts.payload)
self.peer.send(DbgMuxFrame.MsgType.Ping, opts.payload)
f = self.peer.recv()
assert f['MsgType'] == DbgMuxFrame.MsgType.Pong
log.info('Rx Pong with payload \'%s\'', f['Msg'])
self.peer.send(DbgMuxFrame.MsgType.Ack)
hinst = self.peer.disp.find_by_name('PingPong')
hinst.ping(opts.payload)
establish_parser = cmd2.Cmd2ArgumentParser()
establish_parser.add_argument('DPRef',
type=lambda v: int(v, 16),
help='DPRef of a Data Provider in hex')
establish_parser.add_argument('mode',
choices=['interactive', 'udp-bridge'],
help='Connection mode')
@cmd2.with_argparser(establish_parser)
@cmd2.with_category(CATEGORY_DBGMUX)
def do_establish(self, opts) -> None:
''' Establish connection with a Data Provider '''
log.info("Establishing connection with DPRef=0x%04x", opts.DPRef)
self.peer.send(DbgMuxFrame.MsgType.ConnEstablish,
dict(DPRef=opts.DPRef))
if opts.mode == 'interactive':
ch = DbgMuxConnInteractiveTerminal()
self.peer.disp.register(ch)
ch.establish(opts.DPRef)
ch.attach()
elif opts.mode == 'udp-bridge':
ch = DbgMuxConnUdpBridge()
self.peer.disp.register(ch)
ch.establish(opts.DPRef)
f = self.peer.recv()
assert f['MsgType'] == DbgMuxFrame.MsgType.ConnEstablished
if f['Msg']['ConnRef'] == 0xffff:
log.warning("Connection failed: unknown DPRef=0x%04x?", opts.DPRef)
self.peer.send(DbgMuxFrame.MsgType.Ack)
return
terminate_parser = cmd2.Cmd2ArgumentParser()
terminate_parser.add_argument('ConnRef',
type=lambda v: int(v, 16),
help='ConnRef in hex')
log.info("Connection established (ConnRef=0x%04x)",
f['Msg']['ConnRef'])
# Read the messages
while True:
f = self.peer.recv()
if f['MsgType'] != DbgMuxFrame.MsgType.ConnData:
log.warning('Unexpected frame: %s', f)
self.peer.send(DbgMuxFrame.MsgType.Ack)
continue
try: # FIXME: there can be binary data
self.stdout.write(f['Msg']['Data'].decode())
except: # ... ignore it for now
continue
# ACKnowledge reception of a frame
self.peer.send(DbgMuxFrame.MsgType.Ack)
@cmd2.with_argparser(terminate_parser)
@cmd2.with_category(CATEGORY_DBGMUX)
def do_terminate(self, opts) -> None:
''' Terminate connection with a Data Provider '''
msg = dict(ConnRef=opts.ConnRef)
self.peer.send(DbgMuxFrame.MsgType.ConnTerminate, msg)
ap = argparse.ArgumentParser(prog='sedbgmux', description=SEDbgMuxApp.DESC,