# This file is a part of sedbgmux, an open source DebugMux client. # Copyright (c) 2023 Vadim Yanitskiy # # SPDX-License-Identifier: GPL-3.0-or-later # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging import threading import queue import re from typing import NewType, List, Tuple from . import DbgMuxConnHandler from . import DbgMuxConnState # local logger for this module log = logging.getLogger(__name__) # a directory item type DirItem = NewType('DirItem', Tuple[str, str]) class DbgMuxConnWalker(DbgMuxConnHandler): ''' A tool to build a hierarchy of 'Interactive Debug' DPs ''' line_pattern = re.compile(r'^(\S+)\s+([<(]\S+[>)])') def __init__(self): super().__init__() self._connected = threading.Event() def _conn_established(self) -> None: ''' Called when a connection has been established ''' self._connected.set() def _conn_terminated(self) -> None: ''' Called when a connection has been terminated ''' self._connected.clear() def send_cmd(self, cmd: str) -> None: ''' Send a command ''' self.send_data(cmd.encode() + b'\r') def read_rsp(self) -> List[str]: ''' Receive a response, return a list of (not empty) lines ''' data: bytes = b'' while self._connected.is_set(): try: data += self._rx_data_queue.get(block=True, timeout=0.2) self._rx_data_queue.task_done() except queue.Empty: break rsp = data.decode('ascii').split('\r\n') return [l for l in rsp if l != ''] def do_chdir(self, dirname: str) -> None: ''' List current directory contents ''' self.send_cmd(f'cd {dirname}') self.read_rsp() # unused def do_list(self) -> List[DirItem]: ''' List current directory contents ''' self.send_cmd('ls') rsp = self.read_rsp() items = [] for line in rsp: m = self.line_pattern.match(line) if m is None: continue items.append(m.groups()) return items def walk_dir(self, path: str = '') -> List[DirItem]: items: List[DirItem] = self.do_list() for item_name, item_type in items: print(f'{item_type}\t{path}/{item_name}') if item_type == '': self.do_chdir(item_name) self.walk_dir(f'{path}/{item_name}') self.do_chdir('..') def walk(self) -> None: if not self._connected.wait(0.5): log.error('Connection is not established (yet?)') return # read the welcome message and prompt self.read_rsp() # get listing of the root directory self.do_chdir('/') self.walk_dir()