diff --git a/peer.py b/peer.py index 936cb55..1c850d1 100644 --- a/peer.py +++ b/peer.py @@ -22,53 +22,54 @@ from typing import Any from construct import Container, Int16ul from proto import DbgMuxFrame + class DbgMuxPeer: - def __init__(self, sl): - self.tx_count: int = 0 - self.rx_count: int = 0 - self._sl = sl + def __init__(self, sl): + self.tx_count: int = 0 + self.rx_count: int = 0 + self._sl = sl - def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None: - # Encode the inner message first - msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type) + def send(self, msg_type: DbgMuxFrame.MsgType, msg: Any = b'') -> None: + # Encode the inner message first + msg_data = DbgMuxFrame.Msg.build(msg, MsgType=msg_type) - c = Container({ - 'TxCount' : (self.tx_count + 1) % 256, - 'RxCount' : self.rx_count % 256, - 'MsgType' : msg_type, - 'MsgData' : msg_data, - 'FCS' : 0 # Calculated below - }) + c = Container({ + 'TxCount': (self.tx_count + 1) % 256, + 'RxCount': self.rx_count % 256, + 'MsgType': msg_type, + 'MsgData': msg_data, + 'FCS': 0 # Calculated below + }) - # ACK is a bit special - if msg_type == DbgMuxFrame.MsgType.Ack: - c['TxCount'] = 0xf1 + # ACK is a bit special + if msg_type == DbgMuxFrame.MsgType.Ack: + c['TxCount'] = 0xf1 - # There is a Checksum construct, but it requires all checksummed fields - # to be wrapped into an additional RawCopy construct. This is ugly and - # inconvinient from the API point of view, so we calculate the FCS manually: - frame = DbgMuxFrame.Frame.build(c)[:-2] # strip b'\x00\x00' - c['FCS'] = DbgMuxFrame.fcs_func(frame) + # There is a Checksum construct, but it requires all checksummed fields + # to be wrapped into an additional RawCopy construct. This is ugly and + # inconvinient from the API point of view, so we calculate the FCS manually: + frame = DbgMuxFrame.Frame.build(c)[:-2] # strip b'\x00\x00' + c['FCS'] = DbgMuxFrame.fcs_func(frame) - log.debug('Tx frame (Ns=%d, Nr=%d, fcs=0x%04x) %s %s', - c['TxCount'], c['RxCount'], c['FCS'], - c['MsgType'], c['MsgData'].hex()) + log.debug('Tx frame (Ns=%d, Nr=%d, fcs=0x%04x) %s %s', + c['TxCount'], c['RxCount'], c['FCS'], + c['MsgType'], c['MsgData'].hex()) - self._sl.write(frame + Int16ul.build(c['FCS'])) + self._sl.write(frame + Int16ul.build(c['FCS'])) - # ACK is not getting accounted - if msg_type != DbgMuxFrame.MsgType.Ack: - self.tx_count += 1 + # ACK is not getting accounted + if msg_type != DbgMuxFrame.MsgType.Ack: + self.tx_count += 1 - def recv(self) -> Container: - c = DbgMuxFrame.Frame.parse_stream(self._sl) + def recv(self) -> Container: + c = DbgMuxFrame.Frame.parse_stream(self._sl) - log.debug('Rx frame (Ns=%d, Nr=%d, fcs=0x%04x) %s %s', - c['TxCount'], c['RxCount'], c['FCS'], - c['MsgType'], c['MsgData'].hex()) + log.debug('Rx frame (Ns=%d, Nr=%d, fcs=0x%04x) %s %s', + c['TxCount'], c['RxCount'], c['FCS'], + c['MsgType'], c['MsgData'].hex()) - # Parse the inner message - c['Msg'] = DbgMuxFrame.Msg.parse(c['MsgData'], MsgType=c['MsgType']) + # Parse the inner message + c['Msg'] = DbgMuxFrame.Msg.parse(c['MsgData'], MsgType=c['MsgType']) - self.rx_count += 1 - return c + self.rx_count += 1 + return c diff --git a/proto.py b/proto.py index 88623ed..50d0e9d 100644 --- a/proto.py +++ b/proto.py @@ -20,86 +20,86 @@ from construct import * import crcmod -# DebugMux frame definition + class DbgMuxFrame: - # Kudos to Stefan @Sec Zehl for finding the CRC function parameters - fcs_func = crcmod.mkCrcFun(0x11021, rev=True, initCrc=0x0, xorOut=0xffff) + ''' DebugMux frame definition ''' - MsgType = Enum(subcon=Int8ul, - Enquiry = 0x65, # 'e' - Ident = 0x66, # 'f' - Ping = 0x67, # 'g' - Pong = 0x68, # 'h' + # Kudos to Stefan @Sec Zehl for finding the CRC function parameters + fcs_func = crcmod.mkCrcFun(0x11021, rev=True, initCrc=0x0, xorOut=0xffff) - DPAnnounce = 0x69, # 'i' - # TODO: = 0x6a, # 'j' - ConnEstablish = 0x6b, # 'k' - ConnEstablished = 0x6c, # 'l' - ConnTerminate = 0x6d, # 'm' - ConnTerminated = 0x6e, # 'n' - ConnData = 0x6f, # 'o' + MsgType = Enum(subcon=Int8ul, + Enquiry = 0x65, # 'e' + Ident = 0x66, # 'f' + Ping = 0x67, # 'g' + Pong = 0x68, # 'h' + DPAnnounce = 0x69, # 'i' + # TODO: = 0x6a, # 'j' + ConnEstablish = 0x6b, # 'k' + ConnEstablished = 0x6c, # 'l' + ConnTerminate = 0x6d, # 'm' + ConnTerminated = 0x6e, # 'n' + ConnData = 0x6f, # 'o' + # TODO: = 0x70, # 'p' + Ack = 0x71, # 'q' + ) - # TODO: = 0x70, # 'p' - Ack = 0x71, # 'q' - ) + Frame = Struct( + 'Magic' / Const(b'\x42\x42'), + 'Length' / Rebuild(Int16ul, lambda ctx: len(ctx.MsgData) + 5), + 'TxCount' / Int8ul, + 'RxCount' / Int8ul, + 'MsgType' / MsgType, + 'MsgData' / Bytes(lambda ctx: ctx.Length - 5), + 'FCS' / Int16ul, # fcs_func() on all preceeding fields + ) - Frame = Struct( - 'Magic' / Const(b'\x42\x42'), - 'Length' / Rebuild(Int16ul, lambda ctx: len(ctx.MsgData) + 5), - 'TxCount' / Int8ul, - 'RxCount' / Int8ul, - 'MsgType' / MsgType, - 'MsgData' / Bytes(lambda ctx: ctx.Length - 5), - 'FCS' / Int16ul, # fcs_func() on all preceeding fields - ) + # MsgType.Ident structure + MsgIdent = Struct( + 'Magic' / Bytes(4), # TODO + 'Ident' / PascalString(Int8ul, 'ascii'), + ) - # MsgType.Ident structure - MsgIdent = Struct( - 'Magic' / Bytes(4), # TODO - 'Ident' / PascalString(Int8ul, 'ascii'), - ) + # MsgType.{Ping,Pong} structure + MsgPingPong = PascalString(Int8ul, 'ascii') - # MsgType.{Ping,Pong} structure - MsgPingPong = PascalString(Int8ul, 'ascii') + # MsgType.DPAnnounce structure + MsgDPAnnounce = Struct( + 'DPRef' / Int16ul, + 'Name' / PascalString(Int8ul, 'ascii'), + ) - # MsgType.DPAnnounce structure - MsgDPAnnounce = Struct( - 'DPRef' / Int16ul, - 'Name' / PascalString(Int8ul, 'ascii'), - ) + # MsgType.ConnEstablish[ed] structure + MsgConnEstablish = Struct('DPRef' / Int16ul) + MsgConnEstablished = Struct( + 'DPRef' / Int16ul, + 'ConnRef' / Int16ul, + 'DataBlockLimit' / Int16ul, + ) - # MsgType.ConnEstablish[ed] structure - MsgConnEstablish = Struct('DPRef' / Int16ul) - MsgConnEstablished = Struct( - 'DPRef' / Int16ul, - 'ConnRef' / Int16ul, - 'DataBlockLimit' / Int16ul, - ) + # MsgType.ConnTerminate[ed] structure + MsgConnTerminate = Struct('ConnRef' / Int16ul) + MsgConnTerminated = Struct( + 'DPRef' / Int16ul, + 'ConnRef' / Int16ul, + ) - # MsgType.ConnTerminate[ed] structure - MsgConnTerminate = Struct('ConnRef' / Int16ul) - MsgConnTerminated = Struct( - 'DPRef' / Int16ul, - 'ConnRef' / Int16ul, - ) + # MsgType.ConnData structure + MsgConnData = Struct( + 'ConnRef' / Int16ul, + 'Data' / GreedyBytes, + ) - # MsgType.ConnData structure - MsgConnData = Struct( - 'ConnRef' / Int16ul, - 'Data' / GreedyBytes, - ) - - # Complete message definition - Msg = Switch(this.MsgType, default=GreedyBytes, cases={ - MsgType.Enquiry : Const(b''), - MsgType.Ident : MsgIdent, - MsgType.Ping : MsgPingPong, - MsgType.Pong : MsgPingPong, - MsgType.DPAnnounce : MsgDPAnnounce, - MsgType.ConnEstablish : MsgConnEstablish, - MsgType.ConnEstablished : MsgConnEstablished, - MsgType.ConnTerminate : MsgConnTerminate, - MsgType.ConnTerminated : MsgConnTerminated, - MsgType.ConnData : MsgConnData, - MsgType.Ack : Const(b''), - }) + # Complete message definition + Msg = Switch(this.MsgType, default=GreedyBytes, cases={ + MsgType.Enquiry : Const(b''), + MsgType.Ident : MsgIdent, + MsgType.Ping : MsgPingPong, + MsgType.Pong : MsgPingPong, + MsgType.DPAnnounce : MsgDPAnnounce, + MsgType.ConnEstablish : MsgConnEstablish, + MsgType.ConnEstablished : MsgConnEstablished, + MsgType.ConnTerminate : MsgConnTerminate, + MsgType.ConnTerminated : MsgConnTerminated, + MsgType.ConnData : MsgConnData, + MsgType.Ack : Const(b''), + }) diff --git a/sedbgmux.py b/sedbgmux.py index 26e4d53..6ec06d4 100755 --- a/sedbgmux.py +++ b/sedbgmux.py @@ -28,187 +28,187 @@ import sys from proto import DbgMuxFrame from peer import DbgMuxPeer + class SEDbgMuxApp(cmd2.Cmd): - DESC = 'DebugMux client for [Sony] Ericsson phones and modems' + DESC = 'DebugMux client for [Sony] Ericsson phones and modems' - # Command categories - CATEGORY_CONN = 'Connection management commands' - CATEGORY_DBGMUX = 'DebugMux specific commands' + # Command categories + CATEGORY_CONN = 'Connection management commands' + CATEGORY_DBGMUX = 'DebugMux specific commands' - def __init__(self, argv): - super().__init__(allow_cli_args=False) + def __init__(self, argv): + super().__init__(allow_cli_args=False) - self.intro = cmd2.style('Welcome to %s!' % self.DESC, fg=cmd2.fg.red) - self.prompt = 'DebugMux (\'%s\')> ' % argv.serial_port - self.default_category = 'Built-in commands' - self.argv = argv + self.intro = cmd2.style('Welcome to %s!' % self.DESC, fg=cmd2.fg.red) + self.prompt = 'DebugMux (\'%s\')> ' % argv.serial_port + self.default_category = 'Built-in commands' + self.argv = argv - # Modem connection state - self.set_connected(False) + # Modem connection state + self.set_connected(False) - def set_connected(self, state: bool) -> None: - self.connected: bool = state - if self.connected: - self.enable_category(self.CATEGORY_DBGMUX) - else: - msg = 'You must be connected to use this command' - self.disable_category(self.CATEGORY_DBGMUX, msg) + def set_connected(self, state: bool) -> None: + self.connected: bool = state + if self.connected: + self.enable_category(self.CATEGORY_DBGMUX) + else: + msg = 'You must be connected to use this command' + self.disable_category(self.CATEGORY_DBGMUX, msg) - @cmd2.with_category(CATEGORY_CONN) - def do_connect(self, opts) -> None: - ''' Connect to the modem and switch it to DebugMux mode ''' - slp = { - 'port' : self.argv.serial_port, - 'baudrate' : self.argv.serial_baudrate, - 'bytesize' : 8, - 'parity' : 'N', - 'stopbits' : 1, - 'timeout' : self.argv.serial_timeout, - # 'xonoff' : False, - 'rtscts' : False, - 'dsrdtr' : False, - } - self.sl = serial.Serial(**slp) + @cmd2.with_category(CATEGORY_CONN) + def do_connect(self, opts) -> None: + ''' Connect to the modem and switch it to DebugMux mode ''' + self.sl = serial.Serial(port=self.argv.serial_port, + baudrate=self.argv.serial_baudrate, + bytesize=8, parity='N', stopbits=1, + timeout=self.argv.serial_timeout, + # xonoff=False, + rtscts=False, + dsrdtr=False) - # Test the modem - self.transceive('AT', 'OK') - # Enable DebugMux mode - self.transceive('AT*EDEBUGMUX', 'CONNECT') - # Init DebugMux peer - self.peer = DbgMuxPeer(self.sl) - self.set_connected(True) + # Test the modem + self.transceive('AT', 'OK') + # Enable DebugMux mode + self.transceive('AT*EDEBUGMUX', 'CONNECT') + # Init DebugMux peer + self.peer = DbgMuxPeer(self.sl) + self.set_connected(True) - @cmd2.with_category(CATEGORY_CONN) - def do_disconnect(self, opts) -> None: - ''' Disconnect from the modem ''' - self.sl.close() - self.sl = None - self.peer = None - self.set_connected(False) + @cmd2.with_category(CATEGORY_CONN) + def do_disconnect(self, opts) -> None: + ''' Disconnect from the modem ''' + self.sl.close() + self.sl = None + self.peer = None + self.set_connected(False) - @cmd2.with_category(CATEGORY_CONN) - def do_status(self, opts) -> None: - ''' Print connection info and statistics ''' - if not self.connected: - self.poutput('Not connected') - return - self.poutput('Connected to \'%s\'' % self.argv.serial_port) - self.poutput('Baudrate: %d' % self.argv.serial_baudrate) - self.poutput('TxCount (Ns): %d' % self.peer.tx_count) - self.poutput('RxCount (Nr): %d' % self.peer.rx_count) + @cmd2.with_category(CATEGORY_CONN) + def do_status(self, opts) -> None: + ''' Print connection info and statistics ''' + if not self.connected: + self.poutput('Not connected') + return + self.poutput('Connected to \'%s\'' % self.argv.serial_port) + self.poutput('Baudrate: %d' % self.argv.serial_baudrate) + self.poutput('TxCount (Ns): %d' % self.peer.tx_count) + self.poutput('RxCount (Nr): %d' % self.peer.rx_count) - @cmd2.with_category(CATEGORY_DBGMUX) - def do_enquiry(self, opts) -> None: - ''' Enquiry target identifier and available Data Providers ''' - self.peer.send(DbgMuxFrame.MsgType.Enquiry) - while True: - f = self.peer.recv() - if f['MsgType'] == DbgMuxFrame.MsgType.Ident: - log.info("Identified target: '%s', IMEI=%s", - f['Msg']['Ident'][:-15], - f['Msg']['Ident'][-15:]) - elif f['MsgType'] == DbgMuxFrame.MsgType.DPAnnounce: - log.info("Data Provider available (DPRef=0x%04x): '%s'", - f['Msg']['DPRef'], f['Msg']['Name']) + @cmd2.with_category(CATEGORY_DBGMUX) + def do_enquiry(self, opts) -> None: + ''' Enquiry target identifier and available Data Providers ''' + self.peer.send(DbgMuxFrame.MsgType.Enquiry) + while True: + f = self.peer.recv() + if f['MsgType'] == DbgMuxFrame.MsgType.Ident: + log.info("Identified target: '%s', IMEI=%s", + f['Msg']['Ident'][:-15], + f['Msg']['Ident'][-15:]) + elif f['MsgType'] == DbgMuxFrame.MsgType.DPAnnounce: + log.info("Data Provider available (DPRef=0x%04x): '%s'", + f['Msg']['DPRef'], f['Msg']['Name']) - # No more data in the buffer - if self.sl.in_waiting == 0: - break + # No more data in the buffer + if self.sl.in_waiting == 0: + break - # ACKnowledge reception of the info - self.peer.send(DbgMuxFrame.MsgType.Ack) + # ACKnowledge reception of the info + self.peer.send(DbgMuxFrame.MsgType.Ack) - ping_parser = cmd2.Cmd2ArgumentParser() - ping_parser.add_argument('-p', '--payload', - type=str, default='Knock, knock!', - help='Ping payload') + ping_parser = cmd2.Cmd2ArgumentParser() + ping_parser.add_argument('-p', '--payload', + type=str, default='Knock, knock!', + help='Ping payload') - @cmd2.with_argparser(ping_parser) - @cmd2.with_category(CATEGORY_DBGMUX) - def do_ping(self, opts) -> None: - ''' Send a Ping to the target, expect Pong ''' - log.info('Tx Ping with payload \'%s\'', opts.payload) - self.peer.send(DbgMuxFrame.MsgType.Ping, opts.payload) + @cmd2.with_argparser(ping_parser) + @cmd2.with_category(CATEGORY_DBGMUX) + def do_ping(self, opts) -> None: + ''' Send a Ping to the target, expect Pong ''' + log.info('Tx Ping with payload \'%s\'', opts.payload) + self.peer.send(DbgMuxFrame.MsgType.Ping, opts.payload) - f = self.peer.recv() - assert f['MsgType'] == DbgMuxFrame.MsgType.Pong - log.info('Rx Pong with payload \'%s\'', f['Msg']) + f = self.peer.recv() + assert f['MsgType'] == DbgMuxFrame.MsgType.Pong + log.info('Rx Pong with payload \'%s\'', f['Msg']) - establish_parser = cmd2.Cmd2ArgumentParser() - establish_parser.add_argument('DPRef', - type=lambda v: int(v, 16), - help='DPRef of a Data Provider in hex') + establish_parser = cmd2.Cmd2ArgumentParser() + establish_parser.add_argument('DPRef', + type=lambda v: int(v, 16), + help='DPRef of a Data Provider in hex') - @cmd2.with_argparser(establish_parser) - @cmd2.with_category(CATEGORY_DBGMUX) - def do_establish(self, opts) -> None: - ''' Establish connection with a Data Provider ''' - log.info("Establishing connection with DPRef=0x%04x", opts.DPRef) - self.peer.send(DbgMuxFrame.MsgType.ConnEstablish, dict(DPRef=opts.DPRef)) + @cmd2.with_argparser(establish_parser) + @cmd2.with_category(CATEGORY_DBGMUX) + def do_establish(self, opts) -> None: + ''' Establish connection with a Data Provider ''' + log.info("Establishing connection with DPRef=0x%04x", opts.DPRef) + self.peer.send(DbgMuxFrame.MsgType.ConnEstablish, + dict(DPRef=opts.DPRef)) - f = self.peer.recv() - assert f['MsgType'] == DbgMuxFrame.MsgType.ConnEstablished - if f['Msg']['ConnRef'] == 0xffff: - log.warning("Connection failed: unknown DPRef=0x%04x?", opts.DPRef) - self.peer.send(DbgMuxFrame.MsgType.Ack) - return + f = self.peer.recv() + assert f['MsgType'] == DbgMuxFrame.MsgType.ConnEstablished + if f['Msg']['ConnRef'] == 0xffff: + log.warning("Connection failed: unknown DPRef=0x%04x?", opts.DPRef) + self.peer.send(DbgMuxFrame.MsgType.Ack) + return - log.info("Connection established (ConnRef=0x%04x)", f['Msg']['ConnRef']) + log.info("Connection established (ConnRef=0x%04x)", + f['Msg']['ConnRef']) - # Read the messages - while True: - f = self.peer.recv() + # Read the messages + while True: + f = self.peer.recv() - if f['MsgType'] != DbgMuxFrame.MsgType.ConnData: - log.warning('Unexpected frame: %s', f) - self.peer.send(DbgMuxFrame.MsgType.Ack) - continue - try: # FIXME: there can be binary data - self.stdout.write(f['Msg']['Data'].decode()) - except: # ... ignore it for now - continue + if f['MsgType'] != DbgMuxFrame.MsgType.ConnData: + log.warning('Unexpected frame: %s', f) + self.peer.send(DbgMuxFrame.MsgType.Ack) + continue + try: # FIXME: there can be binary data + self.stdout.write(f['Msg']['Data'].decode()) + except: # ... ignore it for now + continue - # ACKnowledge reception of a frame - self.peer.send(DbgMuxFrame.MsgType.Ack) + # ACKnowledge reception of a frame + self.peer.send(DbgMuxFrame.MsgType.Ack) - def send_data(self, data: bytes) -> None: - log.debug("MODEM <- %s", str(data)) - self.sl.write(data) + def send_data(self, data: bytes) -> None: + log.debug("MODEM <- %s", str(data)) + self.sl.write(data) - def send_at_cmd(self, cmd: str, handle_echo:bool = True) -> None: - self.send_data(cmd.encode() + b'\r') - if handle_echo: - self.sl.readline() + def send_at_cmd(self, cmd: str, handle_echo: bool = True) -> None: + self.send_data(cmd.encode() + b'\r') + if handle_echo: + self.sl.readline() - def read_at_rsp(self) -> str: - rsp = self.sl.readline() - log.debug("MODEM -> %s", str(rsp)) - return rsp.rstrip().decode() + def read_at_rsp(self) -> str: + rsp = self.sl.readline() + log.debug("MODEM -> %s", str(rsp)) + return rsp.rstrip().decode() - def transceive(self, cmd: str, exp: str) -> None: - while True: - self.send_at_cmd(cmd) - rsp = self.read_at_rsp() + def transceive(self, cmd: str, exp: str) -> None: + while True: + self.send_at_cmd(cmd) + rsp = self.read_at_rsp() + + if rsp[:7] == '*EMRDY:': + continue + assert rsp == exp + break - if rsp[:7] == '*EMRDY:': - continue - assert rsp == exp - break ap = argparse.ArgumentParser(prog='sedbgmux', description=SEDbgMuxApp.DESC, - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + formatter_class=argparse.ArgumentDefaultsHelpFormatter) group = ap.add_argument_group('Connection parameters') group.add_argument('-p', '--serial-port', metavar='PORT', type=str, default='/dev/ttyACM0', - help='Serial port path (default %(default)s)') + help='Serial port path (default %(default)s)') group.add_argument('--serial-baudrate', metavar='BAUDRATE', type=int, default=115200, - help='Serial port speed (default %(default)s)') + help='Serial port speed (default %(default)s)') group.add_argument('--serial-timeout', metavar='TIMEOUT', type=int, - help='Serial port timeout') + help='Serial port timeout') -log.basicConfig(format='[%(levelname)s] %(filename)s:%(lineno)d %(message)s', level=log.INFO) +log.basicConfig( + format='[%(levelname)s] %(filename)s:%(lineno)d %(message)s', level=log.INFO) if __name__ == '__main__': - argv = ap.parse_args() - app = SEDbgMuxApp(argv) - sys.exit(app.cmdloop()) + argv = ap.parse_args() + app = SEDbgMuxApp(argv) + sys.exit(app.cmdloop())