Compare commits

...

9 Commits

Author SHA1 Message Date
Vadim Yanitskiy de939b373a SEDbgMuxApp: tab-completion for DPRef and ConnRef values 2023-01-22 23:53:17 +06:00
Vadim Yanitskiy 97f3de3564 DbgMuxClient: client role implementation 2023-01-22 23:20:02 +06:00
Vadim Yanitskiy afed084c77 DbgMuxConnHandler: proper handling of the FlowControl message
A FlowControl message basically indicates how many data blocks the
remote side is able to receive from us.  Whenever the block limit is
reached, we should queue outgoing data blocks until the remote side
has given us a new quote by sending another FlowControl message.
2023-01-22 22:36:36 +06:00
Vadim Yanitskiy fa71a8a40d DbgMuxConnHandler: implement DbgMuxConnFileLogger 2023-01-22 22:36:34 +06:00
Vadim Yanitskiy 27f6cf8f31 DbgMuxConnHandler: implement DbgMuxConnTerminal 2023-01-22 22:36:20 +06:00
Vadim Yanitskiy 23dec6fac7 DbgMuxConnHandler: implement DbgMuxConnUdpProxy 2023-01-22 22:36:08 +06:00
Vadim Yanitskiy 23b560197f DbgMuxConnHandler: new abstract class for connection handlers 2023-01-22 22:34:42 +06:00
Vadim Yanitskiy 2b0342d29b DbgMuxPeer: spawn two threads for handling Rx/Tx messages 2023-01-18 01:44:38 +06:00
Vadim Yanitskiy cf22e4a697 sedbgmux-shell.py: print all logging messages with CR
Let the logging messages overwrite the cmd2 prompt!
2023-01-18 01:44:35 +06:00
10 changed files with 800 additions and 59 deletions

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2022 Vadim Yanitskiy <axilirator@gmail.com>
# Copyright (c) 2022-2023 Vadim Yanitskiy <fixeria@osmocom.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
@ -23,10 +23,16 @@ import argparse
import cmd2
import sys
from typing import List
from sedbgmux.io import DbgMuxIOModem
from sedbgmux.io import DumpIONative
from sedbgmux import DbgMuxFrame
from sedbgmux import DbgMuxPeer
from sedbgmux import DbgMuxClient
from sedbgmux.ch import DbgMuxConnTerminal
from sedbgmux.ch import DbgMuxConnFileLogger
from sedbgmux.ch import DbgMuxConnUdpProxy
class SEDbgMuxApp(cmd2.Cmd):
@ -48,9 +54,10 @@ class SEDbgMuxApp(cmd2.Cmd):
self.default_category = 'Built-in commands'
self.argv = argv
# Init the I/O layer and DebugMux peer
# Init the I/O layer, DebugMux peer and client
self.io = DbgMuxIOModem(self.argv)
self.peer = DbgMuxPeer(self.io)
self.client = DbgMuxClient(self.peer)
# Optionally dump DebugMux frames to a file
if argv.dump_file is not None:
@ -60,6 +67,16 @@ class SEDbgMuxApp(cmd2.Cmd):
# Modem connection state
self.set_connected(False)
def _tab_data_providers(self) -> List[cmd2.CompletionItem]:
''' Generate a list of DPRef values for tab-completion '''
return [cmd2.CompletionItem('0x%02x' % DPRef, DPName)
for DPRef, DPName in self.client.data_providers.items()]
def _tab_connections(self) -> List[cmd2.CompletionItem]:
''' Generate a list of ConnRef values for tab-completion '''
return [cmd2.CompletionItem('0x%02x' % ConnRef, 'DPRef=%02x %s' % ConnInfo)
for ConnRef, ConnInfo in self.client.active_conn.items()]
def set_connected(self, state: bool) -> None:
self.connected: bool = state
if self.connected:
@ -72,11 +89,15 @@ class SEDbgMuxApp(cmd2.Cmd):
def do_connect(self, opts) -> None:
''' Connect to the modem and switch it to DebugMux mode '''
self.io.connect()
self.peer.start()
self.client.start()
self.set_connected(True)
@cmd2.with_category(CATEGORY_CONN)
def do_disconnect(self, opts) -> None:
''' Disconnect from the modem '''
self.client.stop()
self.peer.stop()
self.io.disconnect()
self.set_connected(False)
@ -91,27 +112,34 @@ class SEDbgMuxApp(cmd2.Cmd):
self.poutput('TxCount (Ns): %d' % self.peer.tx_count)
self.poutput('RxCount (Nr): %d' % self.peer.rx_count)
show_parser = cmd2.Cmd2ArgumentParser()
show_sparser = show_parser.add_subparsers(dest='command', required=True)
show_sparser.add_parser('target-info')
show_sparser.add_parser('data-providers')
show_sparser.add_parser('connections')
@cmd2.with_argparser(show_parser)
@cmd2.with_category(CATEGORY_CONN)
def do_show(self, opts) -> None:
''' Show various information '''
if opts.command == 'target-info':
self.poutput('Name: ' + (self.client.target_name or '(unknown)'))
self.poutput('IMEI: ' + (self.client.target_imei or '(unknown)'))
elif opts.command == 'data-providers':
for (DPRef, DPName) in self.client.data_providers.items():
self.poutput('Data Provider (DPRef=0x%02x): %s' % (DPRef, DPName))
elif opts.command == 'connections':
for (ConnRef, ConnInfo) in self.client.active_conn.items():
(DPRef, ch) = ConnInfo
self.poutput('Connection (DPRef=0x%02x, ConnRef=0x%02x): %s'
% (DPRef, ConnRef, str(ch)))
for (DPRef, ch) in self.client.pending_conn.items():
self.poutput('Pending Connection (DPRef=0x%02x): %s' % (DPRef, str(ch)))
@cmd2.with_category(CATEGORY_DBGMUX)
def do_enquiry(self, opts) -> None:
''' Enquiry target identifier and available Data Providers '''
self.peer.send(DbgMuxFrame.MsgType.Enquiry)
while True:
f = self.peer.recv()
if f['MsgType'] == DbgMuxFrame.MsgType.Ident:
log.info("Identified target: '%s', IMEI=%s",
f['Msg']['Ident'][:-15],
f['Msg']['Ident'][-15:])
elif f['MsgType'] == DbgMuxFrame.MsgType.DPAnnounce:
log.info("Data Provider available (DPRef=0x%04x): '%s'",
f['Msg']['DPRef'], f['Msg']['Name'])
# No more data in the buffer
# FIXME: layer violation!
if self.io._sl.in_waiting == 0:
break
# ACKnowledge reception of the info
self.peer.send(DbgMuxFrame.MsgType.Ack)
self.client.enquiry()
ping_parser = cmd2.Cmd2ArgumentParser()
ping_parser.add_argument('-p', '--payload',
@ -122,53 +150,64 @@ class SEDbgMuxApp(cmd2.Cmd):
@cmd2.with_category(CATEGORY_DBGMUX)
def do_ping(self, opts) -> None:
''' Send a Ping to the target, expect Pong '''
log.info('Tx Ping with payload \'%s\'', opts.payload)
self.peer.send(DbgMuxFrame.MsgType.Ping, opts.payload)
f = self.peer.recv()
assert f['MsgType'] == DbgMuxFrame.MsgType.Pong
log.info('Rx Pong with payload \'%s\'', f['Msg'])
self.peer.send(DbgMuxFrame.MsgType.Ack)
self.client.ping(opts.payload)
establish_parser = cmd2.Cmd2ArgumentParser()
establish_parser.add_argument('DPRef',
type=lambda v: int(v, 16),
choices_provider=_tab_data_providers,
help='DPRef of a Data Provider in hex')
establish_sparser = establish_parser.add_subparsers(dest='handler', required=True,
help='Connection handler')
ch_terminal = establish_sparser.add_parser('terminal',
help=DbgMuxConnTerminal.__doc__)
ch_file_logger = establish_sparser.add_parser('file-logger',
help=DbgMuxConnFileLogger.__doc__)
ch_file_logger.add_argument('FILE', type=argparse.FileType('ab', 0),
completer=cmd2.Cmd.path_complete,
help='File name or \'-\' for stdout')
ch_udp_proxy = establish_sparser.add_parser('udp-proxy',
help=DbgMuxConnUdpProxy.__doc__)
ch_udp_proxy.add_argument('-la', '--local-addr', dest='laddr', type=str,
default=DbgMuxConnUdpProxy.LADDR_DEF[0],
help='Local address (default: %(default)s)')
ch_udp_proxy.add_argument('-lp', '--local-port', dest='lport', type=int,
default=DbgMuxConnUdpProxy.LADDR_DEF[1],
help='Local port (default: %(default)s)')
ch_udp_proxy.add_argument('-ra', '--remote-addr', dest='raddr', type=str,
default=DbgMuxConnUdpProxy.RADDR_DEF[0],
help='Remote address (default: %(default)s)')
ch_udp_proxy.add_argument('-rp', '--remote-port', dest='rport', type=int,
default=DbgMuxConnUdpProxy.RADDR_DEF[1],
help='Remote port (default: %(default)s)')
@cmd2.with_argparser(establish_parser)
@cmd2.with_category(CATEGORY_DBGMUX)
def do_establish(self, opts) -> None:
''' Establish connection with a Data Provider '''
log.info("Establishing connection with DPRef=0x%04x", opts.DPRef)
self.peer.send(DbgMuxFrame.MsgType.ConnEstablish,
dict(DPRef=opts.DPRef))
''' Establish connections with Data Providers '''
if opts.handler == 'terminal':
ch = DbgMuxConnTerminal()
elif opts.handler == 'file-logger':
ch = DbgMuxConnFileLogger(opts.FILE)
elif opts.handler == 'udp-proxy':
ch = DbgMuxConnUdpProxy(laddr=(opts.laddr, opts.lport),
raddr=(opts.raddr, opts.rport))
self.client.conn_establish(opts.DPRef, ch)
if opts.handler == 'terminal':
ch.attach() # blocking until Ctrl + [CD]
ch.terminate()
f = self.peer.recv()
assert f['MsgType'] == DbgMuxFrame.MsgType.ConnEstablished
if f['Msg']['ConnRef'] == 0xffff:
log.warning("Connection failed: unknown DPRef=0x%04x?", opts.DPRef)
self.peer.send(DbgMuxFrame.MsgType.Ack)
return
terminate_parser = cmd2.Cmd2ArgumentParser()
terminate_parser.add_argument('ConnRef',
type=lambda v: int(v, 16),
choices_provider=_tab_connections,
help='ConnRef in hex')
log.info("Connection established (ConnRef=0x%04x)",
f['Msg']['ConnRef'])
# Read the messages
while True:
f = self.peer.recv()
if f is None:
continue # No more data in the buffer
if f['MsgType'] != DbgMuxFrame.MsgType.ConnData:
log.warning('Unexpected frame: %s', f)
self.peer.send(DbgMuxFrame.MsgType.Ack)
continue
try: # FIXME: there can be binary data
self.stdout.write(f['Msg']['Data'].decode())
except: # ... ignore it for now
continue
# ACKnowledge reception of a frame
self.peer.send(DbgMuxFrame.MsgType.Ack)
@cmd2.with_argparser(terminate_parser)
@cmd2.with_category(CATEGORY_DBGMUX)
def do_terminate(self, opts) -> None:
''' Terminate connection with a Data Provider '''
self.client.conn_terminate(opts.ConnRef)
ap = argparse.ArgumentParser(prog='sedbgmux-shell', description=SEDbgMuxApp.DESC)
@ -187,7 +226,7 @@ group.add_argument('--dump-file', metavar='FILE', type=str,
help='save Rx/Tx DebugMux frames to a file')
log.basicConfig(
format='[%(levelname)s] %(filename)s:%(lineno)d %(message)s', level=log.INFO)
format='\r[%(levelname)s] %(filename)s:%(lineno)d %(message)s', level=log.INFO)
if __name__ == '__main__':
argv = ap.parse_args()

View File

@ -1,3 +1,5 @@
from . import io
from .proto import DbgMuxFrame
from .peer import DbgMuxPeer
from .ping_pong import DbgMuxPingPong
from .client import DbgMuxClient

6
sedbgmux/ch/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from .base import DbgMuxConnHandler
from .base import DbgMuxConnState
from .udp_proxy import DbgMuxConnUdpProxy
from .terminal import DbgMuxConnTerminal
from .file_logger import DbgMuxConnFileLogger

122
sedbgmux/ch/base.py Normal file
View File

@ -0,0 +1,122 @@
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2023 Vadim Yanitskiy <fixeria@osmocom.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import queue
import enum
import abc
from typing import Any, Optional
from .. import DbgMuxFrame
class DbgMuxConnState(enum.Enum):
''' Connection state for DbgMuxConnHandler '''
NotEstablished = enum.auto()
Establishing = enum.auto()
Established = enum.auto()
class DbgMuxConnHandler(abc.ABC):
''' Abstract DebugMux connection handler '''
def __init__(self):
self.conn_state: DbgMuxConnState = DbgMuxConnState.NotEstablished
self._rx_data_queue: queue.Queue = queue.Queue()
self._tx_data_queue: queue.Queue = queue.Queue()
self._tx_queue: Optional[queue.Queue] = None
self.DataBlockLimit: int = 0
self.ConnRef: int = 0xffff
self.DPRef: int = 0xffff
@abc.abstractmethod
def _conn_established(self) -> None:
''' Called when a connection has been established '''
@abc.abstractmethod
def _conn_terminated(self) -> None:
''' Called when a connection has been terminated '''
def send_msg(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
''' Send a DebugMux message to the target '''
assert self._tx_queue is not None
self._tx_queue.put((msg_type, msg))
def send_data(self, data: bytes) -> None:
''' Send connection data to the target '''
assert self.conn_state == DbgMuxConnState.Established
msg_type = DbgMuxFrame.MsgType.ConnData
msg = dict(ConnRef=self.ConnRef, Data=data)
if self.DataBlockLimit > 0: # Can we send immediately?
self.send_msg(msg_type, msg)
self.DataBlockLimit -= 1
else: # Postpone transmission until a FlowControl is received
self._tx_data_queue.put((msg_type, msg))
def establish(self, DPRef: int, txq: queue.Queue) -> None:
''' Establish connection with a DataProvider '''
assert self.conn_state == DbgMuxConnState.NotEstablished
log.info('Establishing connection with DPRef=0x%04x', DPRef)
self.DPRef = DPRef
self._tx_queue = txq
self.conn_state = DbgMuxConnState.Establishing
self.send_msg(DbgMuxFrame.MsgType.ConnEstablish, dict(DPRef=DPRef))
def terminate(self) -> None:
''' Terminate connection with a DataProvider '''
assert self.conn_state == DbgMuxConnState.Established
log.info('Terminating connection ConnRef=0x%04x with DPRef=0x%04x',
self.ConnRef, self.DPRef)
self.send_msg(DbgMuxFrame.MsgType.ConnTerminate, dict(ConnRef=self.ConnRef))
def _handle_established(self, ConnRef: int, DataBlockLimit: int) -> None:
''' Called on connection establishment '''
assert self.conn_state == DbgMuxConnState.Establishing
log.info('Connection established: DPRef=0x%04x, ConnRef=0x%04x, DataBlockLimit=%u',
self.DPRef, ConnRef, DataBlockLimit)
self.conn_state = DbgMuxConnState.Established
self.DataBlockLimit = DataBlockLimit
self.ConnRef = ConnRef
self._conn_established()
def _handle_terminated(self) -> None:
''' Called on connection termination '''
assert self.conn_state == DbgMuxConnState.Established
log.info('Connection terminated: DPRef=0x%04x, ConnRef=0x%04x',
self.DPRef, self.ConnRef)
self.conn_state = DbgMuxConnState.NotEstablished
# TODO: reset the internal state?
self._conn_terminated()
def _handle_data(self, data: bytes) -> None:
''' Called on reciept of connection data '''
self._rx_data_queue.put(data)
def _handle_flow_control(self, DataBlockLimit: int):
''' Called on reciept of FlowControl message '''
assert self.conn_state == DbgMuxConnState.Established
self.DataBlockLimit = DataBlockLimit
while self.DataBlockLimit > 0:
try:
(msg_type, msg) = self._tx_data_queue.get(block=False)
self.send_msg(msg_type, msg)
self.DataBlockLimit -= 1
self._tx_data_queue.task_done()
except queue.Empty:
break

View File

@ -0,0 +1,57 @@
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2023 Vadim Yanitskiy <fixeria@osmocom.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
import typing
import queue
from . import DbgMuxConnHandler
class DbgMuxConnFileLogger(DbgMuxConnHandler):
''' Log all received connection data to a file (binary mode) '''
def __init__(self, file: typing.BinaryIO):
super().__init__()
self._file = file
self._rx_thread = threading.Thread(target=self._rx_worker,
daemon=True)
self._shutdown = threading.Event()
def _rx_worker(self) -> None:
''' Dequeue data blocks from the Rx queue and write to file '''
while not self._shutdown.is_set():
try:
data: bytes = self._rx_data_queue.get(block=True, timeout=0.2)
self._file.write(data)
self._rx_data_queue.task_done()
except queue.Empty:
pass
log.debug('Thread \'%s-Rx\' is shutting down', self.__class__.__name__)
def _conn_established(self) -> None:
''' Called when a connection has been established '''
self._shutdown.clear()
self._rx_thread.start()
def _conn_terminated(self) -> None:
''' Called when a connection has been terminated '''
self._shutdown.set()
self._rx_thread.join()

72
sedbgmux/ch/terminal.py Normal file
View File

@ -0,0 +1,72 @@
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2022 Vadim Yanitskiy <fixeria@osmocom.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
import queue
import sys
from . import DbgMuxConnHandler
from . import DbgMuxConnState
class DbgMuxConnTerminal(DbgMuxConnHandler):
''' Terminal for communicating with 'Interactive Debug' DPs '''
def __init__(self):
super().__init__()
self.attached: bool = False
self._rx_thread = threading.Thread(target=self._rx_worker,
daemon=True)
self._shutdown = threading.Event()
def attach(self) -> None:
''' Read data blocks from stdin and enqueue to the Tx queue '''
self.attached = True
while not self._shutdown.is_set():
try:
line: str = input()
if self.conn_state == DbgMuxConnState.Established:
self.send_data(bytes(line + '\r', 'ascii'))
except (KeyboardInterrupt, EOFError) as e:
break
self.attached = False
def _rx_worker(self) -> None:
''' Dequeue data blocks from the Rx queue and print to stdout '''
while not self._shutdown.is_set():
try:
data: bytes = self._rx_data_queue.get(block=True, timeout=0.2)
if self.attached:
sys.stdout.write(data.decode('ascii'))
sys.stdout.flush()
self._rx_data_queue.task_done()
except queue.Empty:
pass
log.debug('Thread \'%s-Rx\' is shutting down', self.__class__.__name__)
def _conn_established(self) -> None:
''' Called when a connection has been established '''
self._shutdown.clear()
self._rx_thread.start()
def _conn_terminated(self) -> None:
''' Called when a connection has been terminated '''
self._shutdown.set()
self._rx_thread.join()

88
sedbgmux/ch/udp_proxy.py Normal file
View File

@ -0,0 +1,88 @@
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2022-2023 Vadim Yanitskiy <fixeria@osmocom.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
import queue
import socket
from typing import Tuple
from .. import DbgMuxFrame
from . import DbgMuxConnHandler
from . import DbgMuxConnState
class DbgMuxConnUdpProxy(DbgMuxConnHandler):
''' Expose a DebugMux connection as a UDP socket '''
LADDR_DEF: Tuple[str, int] = ('127.0.0.1', 2424)
RADDR_DEF: Tuple[str, int] = ('127.0.0.1', 4242)
DGRAM_TIMEOUT_DEF: float = 0.2
DGRAM_MAX_LEN: int = 2048
def __init__(self, *args, **kw):
super().__init__(*args)
self.laddr: Tuple[str, int] = kw.get('laddr', self.LADDR_DEF)
self.raddr: Tuple[str, int] = kw.get('raddr', self.RADDR_DEF)
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind(self.laddr)
self._sock.settimeout(self.DGRAM_TIMEOUT_DEF)
self._rx_thread = threading.Thread(target=self._rx_worker,
daemon=True)
self._tx_thread = threading.Thread(target=self._tx_worker,
daemon=True)
self._shutdown = threading.Event()
def _rx_worker(self) -> None:
''' Dequeue data blocks from the Rx queue and send as datagrams '''
while not self._shutdown.is_set():
try:
data: bytes = self._rx_data_queue.get(block=True, timeout=0.2)
self._sock.sendto(data, self.raddr)
self._rx_data_queue.task_done()
except queue.Empty:
pass
log.debug('Thread \'%s-Rx\' is shutting down', self.__class__.__name__)
def _tx_worker(self) -> None:
''' Receive data blocks as datagrams and enqueue to the Tx queue '''
while not self._shutdown.is_set():
try:
(data, addr) = self._sock.recvfrom(self.DGRAM_MAX_LEN)
if self.conn_state == DbgMuxConnState.Established:
self.send_data(data)
except TimeoutError:
pass
log.debug('Thread \'%s-Tx\' is shutting down', self.__class__.__name__)
def _conn_established(self) -> None:
''' Called when a connection has been established '''
self._shutdown.clear()
self._rx_thread.start()
self._tx_thread.start()
def _conn_terminated(self) -> None:
''' Called when a connection has been terminated '''
self._shutdown.set()
self._rx_thread.join()
self._tx_thread.join()

249
sedbgmux/client.py Executable file
View File

@ -0,0 +1,249 @@
#!/usr/bin/env python3
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2022-2023 Vadim Yanitskiy <fixeria@osmocom.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
import queue
from typing import Dict, Tuple
from construct import Container
from . import DbgMuxFrame
from . import DbgMuxPeer
from . import DbgMuxPingPong
from .ch import DbgMuxConnHandler
from .ch import DbgMuxConnState
class DbgMuxClient:
''' DebugMux client role implementation '''
IDENT_TIMEOUT: float = 0.5
CONN_EST_TIMEOUT: float = 1.0
CONN_TERM_TIMEOUT: float = 1.0
data_providers: Dict[int, str] # { DPRef : Name }
pending_conn: Dict[int, DbgMuxConnHandler] # { DPRef : ch }
active_conn: Dict[int, Tuple[int, DbgMuxConnHandler]] # { ConnRef : (DPRef, ch) }
def __init__(self, peer: DbgMuxPeer):
self.target_name: str = ''
self.target_imei: str = ''
self.data_providers = dict()
self.pending_conn = dict()
self.active_conn = dict()
self.peer: DbgMuxPeer = peer
self.ping_pong = DbgMuxPingPong(peer)
self._rx_thread = threading.Thread(target=self._rx_worker,
name='DbgMuxClient-Rx',
daemon=True)
self._shutdown = threading.Event()
self._ev_ident = threading.Event()
self._ev_conn_est = threading.Event()
self._ev_conn_term = threading.Event()
def start(self) -> None:
''' Start dequeueing messages from peer '''
self._shutdown.clear()
self._rx_thread.start()
def stop(self) -> None:
''' Stop dequeueing messages from peer '''
self._shutdown.set()
self._rx_thread.join()
self._reset()
def _reset(self) -> None:
''' Reset the internal state '''
self.target_name = ''
self.target_imei = ''
for (_, ch) in self.active_conn.values():
ch._handle_terminated()
del ch
self.active_conn.clear()
self.pending_conn.clear()
self.data_providers.clear()
self._ev_ident.clear()
self._ev_conn_est.clear()
self._ev_conn_term.clear()
def enquiry(self) -> bool:
''' Enquiry target identifier and available Data Providers '''
self._ev_ident.clear()
self.peer.send(DbgMuxFrame.MsgType.Enquiry)
if not self._ev_ident.wait(timeout=self.IDENT_TIMEOUT):
log.error('Timeout waiting for identification response')
return False
return True
def ping(self, payload: str = 'ping') -> bool:
''' Send a Ping to the target, expect Pong '''
return self.ping_pong.ping(payload)
def conn_establish(self, DPRef: int, ch: DbgMuxConnHandler) -> bool:
''' Establish connection with a Data Provider '''
if DPRef not in self.data_providers:
log.error('DPRef=0x%04x is not registered', DPRef)
return False
if DPRef in self.pending_conn:
log.error('DPRef=0x%04x already has a pending connection', DPRef)
return False
for conn in self.active_conn.values():
if conn[0] != DPRef:
continue
log.error('DPRef=0x%04x already has an active connection', DPRef)
return False
self._ev_conn_est.clear()
self.pending_conn[DPRef] = ch
ch.establish(DPRef, self.peer._tx_queue)
if not self._ev_conn_est.wait(timeout=self.CONN_EST_TIMEOUT):
log.error('Timeout establishing connection with DPRef=0x%04x', DPRef)
self.pending_conn.pop(DPRef)
return False
return ch.conn_state == DbgMuxConnState.Established
def conn_terminate(self, ConnRef: int) -> bool:
''' Terminate connection with a Data Provider '''
if ConnRef not in self.active_conn:
log.error('ConnRef=0x%04x is not registered', ConnRef)
return False
self._ev_conn_term.clear()
(DPRef, ch) = self.active_conn.get(ConnRef)
ch.terminate()
if not self._ev_conn_term.wait(timeout=self.CONN_TERM_TIMEOUT):
log.error('Timeout terminating connection with DPRef=0x%04x', DPRef)
return False
return ch.conn_state == DbgMuxConnState.NotEstablished
def _rx_worker(self) -> None:
''' Dequeue DebugMux frames from peer and handle them '''
while not self._shutdown.is_set():
try:
frame: Container = self.peer._rx_queue.get(block=True, timeout=0.5)
self._handle_frame(frame)
self.peer._rx_queue.task_done()
except queue.Empty:
pass
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def _handle_frame(self, frame: Container) -> None:
MsgType, Msg = frame['MsgType'], frame['Msg']
if MsgType == DbgMuxFrame.MsgType.Ident:
self._handle_ident(Msg)
self._ev_ident.set()
elif MsgType == DbgMuxFrame.MsgType.DPAnnounce:
self._handle_dp_announce(Msg)
elif MsgType == DbgMuxFrame.MsgType.Pong:
self.ping_pong._handle_pong(Msg)
elif MsgType == DbgMuxFrame.MsgType.ConnEstablished:
self._handle_conn_est(Msg)
self._ev_conn_est.set()
elif MsgType == DbgMuxFrame.MsgType.ConnTerminated:
self._handle_conn_term(Msg)
self._ev_conn_term.set()
elif MsgType == DbgMuxFrame.MsgType.ConnData:
self._handle_conn_data(Msg)
elif MsgType == DbgMuxFrame.MsgType.FlowControl:
self._handle_conn_flow_control(Msg)
return # no Ack
elif MsgType == DbgMuxFrame.MsgType.Ack:
return # no Ack
else:
log.warning('Unhandled DebugMux message %s: %s', MsgType, Msg)
self.peer.send(DbgMuxFrame.MsgType.Ack)
def _handle_ident(self, msg: Container) -> None:
''' Handle DbgMuxFrame.MsgType.Ident '''
self.target_name = msg['Ident'][:-15]
self.target_imei = msg['Ident'][-15:]
log.info('Identified target: \'%s\', IMEI=%s',
self.target_name, self.target_imei)
def _handle_dp_announce(self, msg: Container) -> None:
''' Handle DbgMuxFrame.MsgType.DPAnnounce '''
DPRef, Name = msg['DPRef'], msg['Name']
log.info('Data Provider available (DPRef=0x%04x): \'%s\'', DPRef, Name)
if DPRef in self.data_providers:
log.warning('DPRef=0x%04x was already announced', DPRef)
self.data_providers[DPRef] = Name
def _handle_conn_est(self, msg: Container) -> None:
''' Handle DbgMuxFrame.MsgType.ConnEstablished '''
DPRef, ConnRef = msg['DPRef'], msg['ConnRef']
log.info('Rx ConnEstablished: ConnRef=0x%04x, DPRef=0x%04x', ConnRef, DPRef)
if ConnRef == 0xffff:
log.warning('Connection establishment failed: '
'no such DPRef=0x%04x?', DPRef)
return
elif ConnRef in self.active_conn:
log.error('ConnRef=0x%04x is already established', ConnRef)
return
elif DPRef not in self.data_providers:
log.error('DPRef=0x%04x is not registered', DPRef)
return
elif DPRef not in self.pending_conn:
log.error('DPRef=0x%04x has no pending connection', DPRef)
return
ch = self.pending_conn.pop(DPRef)
self.active_conn[ConnRef] = (DPRef, ch)
ch._handle_established(ConnRef, msg['DataBlockLimit'])
def _handle_conn_term(self, msg: Container) -> None:
''' Handle DbgMuxFrame.MsgType.ConnTerminated '''
DPRef, ConnRef = msg['DPRef'], msg['ConnRef']
log.info('Rx ConnTerminated: ConnRef=0x%04x, DPRef=0x%04x', ConnRef, DPRef)
if ConnRef == 0xffff:
log.warning('Connection termination failed: '
'no such DPRef=0x%04x?', DPRef)
return
elif ConnRef not in self.active_conn:
log.error('ConnRef=0x%04x is not established', ConnRef)
return
elif DPRef not in self.data_providers:
log.error('DPRef=0x%04x is not registered', DPRef)
return
# Old DPRef becomes invalid, remove it
del self.data_providers[DPRef]
(_, ch) = self.active_conn.pop(ConnRef)
ch._handle_terminated()
def _handle_conn_data(self, msg: Container) -> None:
''' Handle DbgMuxFrame.MsgType.ConnData '''
ConnRef, Data = msg['ConnRef'], msg['Data']
if ConnRef not in self.active_conn:
log.error('ConnRef=0x%04x is not established', ConnRef)
return
(DPRef, ch) = self.active_conn.get(ConnRef)
ch._handle_data(Data)
def _handle_conn_flow_control(self, msg: Container) -> None:
''' Handle DbgMuxFrame.MsgType.FlowControl '''
ConnRef, DataBlockLimit = msg['ConnRef'], msg['DataBlockLimit']
if ConnRef not in self.active_conn:
log.error('ConnRef=0x%04x is not established', ConnRef)
return
(DPRef, ch) = self.active_conn.get(ConnRef)
ch._handle_flow_control(DataBlockLimit)

View File

@ -17,6 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
import queue
from typing import Any, Optional
from construct import Const, Container, Int16ul
@ -33,10 +35,57 @@ class DbgMuxPeer:
self.rx_count: int = 0
self.io = io
# Threads handling Rx/Tx frames
self._rx_thread = threading.Thread(target=self._rx_worker,
name='DbgMuxPeer-Rx',
daemon=True)
self._tx_thread = threading.Thread(target=self._tx_worker,
name='DbgMuxPeer-Tx',
daemon=True)
self._shutdown = threading.Event()
# Internal queues for Rx/Tx frames
self._rx_queue = queue.Queue()
self._tx_queue = queue.Queue()
def enable_dump(self, dump: DumpIO):
self.dump = dump
def start(self) -> None:
self._shutdown.clear()
self._rx_thread.start()
self._tx_thread.start()
def stop(self) -> None:
# Set the shutdown event
self._shutdown.set()
# Wait for both threads to terminate
self._tx_thread.join()
self._rx_thread.join()
def _rx_worker(self) -> None:
while not self._shutdown.is_set():
frame = self._recv() # blocking until timeout
if frame is not None:
self._rx_queue.put(frame)
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def _tx_worker(self) -> None:
while not self._shutdown.is_set():
try:
(msg_type, msg) = self._tx_queue.get(block=True, timeout=0.5)
self._send(msg_type, msg)
self._tx_queue.task_done()
except queue.Empty:
pass
log.debug('Thread \'%s\' is shutting down', threading.current_thread().name)
def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
''' Send a single message (non-blocking call) '''
self._tx_queue.put((msg_type, msg))
def _send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None:
# Encode the inner message first
msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type)
@ -72,7 +121,11 @@ class DbgMuxPeer:
if msg_type != DbgMuxFrame.MsgType.Ack:
self.tx_count += 1
def recv(self) -> Optional[Container]:
def recv(self, timeout: Optional[float] = None) -> Container:
''' Receive a single message (blocking call) '''
return self._rx_queue.get(block=True, timeout=timeout)
def _recv(self) -> Optional[Container]:
frame: bytes = b''
frame += self.io.read(2) # Magic
if frame == b'':

53
sedbgmux/ping_pong.py Executable file
View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
# This file is a part of sedbgmux, an open source DebugMux client.
# Copyright (c) 2022-2023 Vadim Yanitskiy <fixeria@osmocom.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging as log
import threading
from . import DbgMuxFrame
from . import DbgMuxPeer
class DbgMuxPingPong:
''' Implements link testing logic '''
def __init__(self, peer: DbgMuxPeer, timeout: float = 1.0):
self.peer: DbgMuxPeer = peer
self.timeout: float = timeout
self._pong = threading.Event()
self._expect_pong: bool = False
def ping(self, payload: str) -> bool:
log.info('Sending Ping with payload \'%s\'', payload)
self._pong.clear()
self._expect_pong = True
self.peer.send(DbgMuxFrame.MsgType.Ping, payload)
if not self._pong.wait(timeout=self.timeout):
log.warning('Timeout waiting for Pong')
return False
return True
def _handle_pong(self, payload: str) -> None:
if not self._expect_pong:
log.warning('Rx unexpected Pong, sending ACK anyway')
return
log.info('Rx Pong with payload \'%s\'', payload)
self._expect_pong = False
self._pong.set()