osmo_verify_*: clarify naming to osmo_interact_*
Have common implementations in one place, and expose distinct command line argument signatures to obtain the separate tasks as separate scripts. osmo_interact_common.py implements the parts common to all VTY and CTRL interactions. osmo_interact_vty.py and osmo_interact_ctrl.py implement commands I/O but only expose command line args to directly pipe commands and responses. osmo_verify_transcript_vty.py and osmo_verify_transcript_ctrl.py act as before, now implemented by importing osmo_interact_{vty,ctrl}.py, only exposing the verifification command line arguments. Change-Id: Ie0cbd5db85ebebc893df638a07f5568632563dc9
This commit is contained in:
parent
08d645b01d
commit
6562c085c4
|
@ -4,6 +4,8 @@ __version__ = '0.0.1'
|
|||
__all__ = ['obscvty', 'osmodumpdoc', 'osmotestconfig', 'osmotestvty',
|
||||
'osmoutil',
|
||||
'osmo_ipa',
|
||||
'osmo_verify_transcript_common',
|
||||
'osmo_interact_common',
|
||||
'osmo_interact_vty',
|
||||
'osmo_interact_ctrl',
|
||||
'osmo_verify_transcript_vty',
|
||||
'osmo_verify_transcript_ctrl']
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Common code for verify_transcript_vty.py and verify_transcript_ctrl.py.
|
||||
Common code for osmo_interact_vty.py and osmo_interact_ctrl.py.
|
||||
This implements all of application interaction, piping and verification.
|
||||
osmo_interact_{vty,ctrl}.py plug VTY and CTRL interface specific bits.
|
||||
'''
|
||||
|
||||
import argparse
|
||||
|
@ -249,13 +251,14 @@ class Interact:
|
|||
output.write('\n'.join(res))
|
||||
output.write('\n')
|
||||
|
||||
def end_process(proc):
|
||||
def end_process(proc, quiet=False):
|
||||
if not proc:
|
||||
return
|
||||
|
||||
rc = proc.poll()
|
||||
if rc is not None:
|
||||
print('Process has already terminated with', rc)
|
||||
if not quiet:
|
||||
print('Process has already terminated with', rc)
|
||||
proc.wait()
|
||||
return
|
||||
|
||||
|
@ -277,9 +280,11 @@ def end_process(proc):
|
|||
if proc.poll() is None:
|
||||
# termination seems to be slower than that, let's just kill
|
||||
proc.kill()
|
||||
print("Killed child process")
|
||||
if not quiet:
|
||||
print("Killed child process")
|
||||
elif waited_time > .002:
|
||||
print("Terminating took %.3fs" % waited_time)
|
||||
if not quiet:
|
||||
print("Terminating took %.3fs" % waited_time)
|
||||
proc.wait()
|
||||
|
||||
class Application:
|
||||
|
@ -307,7 +312,7 @@ class Application:
|
|||
self.proc = subprocess.Popen(self.command_tuple, stdout=out_err, stderr=out_err)
|
||||
|
||||
def stop(self):
|
||||
end_process(self.proc)
|
||||
end_process(self.proc, self.quiet)
|
||||
|
||||
def verify_application(run_app_str, interact, transcript_file, verbose):
|
||||
passed = None
|
||||
|
@ -347,31 +352,31 @@ def common_parser():
|
|||
help="Port that the application opens.")
|
||||
parser.add_argument('-H', '--host', dest='host', default='localhost',
|
||||
help="Host that the application opens the port on.")
|
||||
return parser
|
||||
|
||||
def parser_add_verify_args(parser):
|
||||
parser.add_argument('-u', '--update', dest='update', action='store_true',
|
||||
help='Do not verify, but OVERWRITE transcripts based on'
|
||||
' the applications current behavior. OVERWRITES TRANSCRIPT'
|
||||
' FILES.')
|
||||
parser.add_argument('-v', '--verbose', action='store_true',
|
||||
help='Print commands and application output')
|
||||
parser.add_argument('-O', '--output', dest='output_path',
|
||||
help="Do not verify a transcript file, but directly"
|
||||
" write command results to a file, '-' means stdout."
|
||||
" If input files are provided, these must not be transcript"
|
||||
" files, but bare commands, one per line, without"
|
||||
" prompts nor expected results."
|
||||
" If neither --command nor input files are"
|
||||
" provided, read commands from stdin."
|
||||
" Ignore '--update'.")
|
||||
parser.add_argument('-c', '--command', dest='cmd_str',
|
||||
help="Run this command instead of reading from a"
|
||||
" transcript file, multiple commands may be separated"
|
||||
" by ';'. Implies '-O -' unless -O is passed.")
|
||||
parser.add_argument('transcript_files', nargs='*', help='transcript files to verify')
|
||||
parser.add_argument('transcript_files', nargs='*', help='transcript file(s) to verify')
|
||||
return parser
|
||||
|
||||
def run_commands(run_app_str, output_path, cmd_str, cmd_files, interact):
|
||||
def parser_add_run_args(parser):
|
||||
parser.add_argument('-O', '--output', dest='output_path',
|
||||
help="Write command results to a file instead of stdout."
|
||||
"('-O -' writes to stdout and is the default)")
|
||||
parser.add_argument('-c', '--command', dest='cmd_str',
|
||||
help="Run this command (before reading input files, if any)."
|
||||
" multiple commands may be separated by ';'")
|
||||
parser.add_argument('cmd_files', nargs='*', help='file(s) with plain commands to run')
|
||||
return parser
|
||||
|
||||
def main_run_commands(run_app_str, output_path, cmd_str, cmd_files, interact):
|
||||
to_stdout = False
|
||||
if output_path == '-':
|
||||
if not output_path or output_path == '-':
|
||||
to_stdout = True
|
||||
output = sys.stdout
|
||||
else:
|
||||
|
@ -419,7 +424,7 @@ def run_commands(run_app_str, output_path, cmd_str, cmd_files, interact):
|
|||
except:
|
||||
traceback.print_exc()
|
||||
|
||||
def verify_transcripts(run_app_str, transcript_files, interact, verbose):
|
||||
def main_verify_transcripts(run_app_str, transcript_files, interact, verbose):
|
||||
results = []
|
||||
for t in transcript_files:
|
||||
passed = verify_application(run_app_str=run_app_str,
|
||||
|
@ -438,18 +443,4 @@ def verify_transcripts(run_app_str, transcript_files, interact, verbose):
|
|||
if not all_passed:
|
||||
sys.exit(1)
|
||||
|
||||
def main(run_app_str, output_path, cmd_str, transcript_files, interact, verbose):
|
||||
|
||||
# If there is a command to run, pipe to stdout by default.
|
||||
# If there are no input files nor commands to run, read from stdin
|
||||
# and write results to stdout.
|
||||
if not output_path:
|
||||
if cmd_str or (not cmd_str and not transcript_files):
|
||||
output_path = '-'
|
||||
|
||||
if output_path:
|
||||
run_commands(run_app_str, output_path, cmd_str, transcript_files, interact)
|
||||
else:
|
||||
verify_transcripts(run_app_str, transcript_files, interact, verbose)
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
|
|
@ -0,0 +1,100 @@
|
|||
#!/usr/bin/env python3
|
||||
#
|
||||
# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
|
||||
# All rights reserved.
|
||||
#
|
||||
# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Run CTRL commands or test transcripts against a given application. Commandline
|
||||
invocation exposes only direct command piping, the transcript verification code
|
||||
is exposed as commandline args by osmo_verify_transcript_ctrl.py.
|
||||
'''
|
||||
|
||||
import re
|
||||
|
||||
from osmopy.osmo_interact_common import *
|
||||
from osmopy.osmo_ipa import Ctrl, IPA
|
||||
|
||||
class InteractCtrl(Interact):
|
||||
next_id = 1
|
||||
keep_ids = True
|
||||
re_command = re.compile('^(SET|GET) ([^ ]*) (.*)$')
|
||||
|
||||
class CtrlStep(Interact.StepBase):
|
||||
|
||||
@staticmethod
|
||||
def is_next_step(line, interact_instance):
|
||||
m = InteractCtrl.re_command.match(line)
|
||||
if not m:
|
||||
return None
|
||||
next_step = InteractCtrl.CtrlStep()
|
||||
|
||||
set_get = m.group(1)
|
||||
cmd_id = m.group(2)
|
||||
var_val = m.group(3)
|
||||
if not interact_instance.keep_ids:
|
||||
cmd_id = interact_instance.next_id
|
||||
interact_instance.next_id += 1
|
||||
next_step.command = '%s %s %s' % (set_get, cmd_id, var_val)
|
||||
|
||||
return next_step
|
||||
|
||||
def __init__(self, port, host, verbose=False, update=False, keep_ids=True):
|
||||
if not update:
|
||||
keep_ids = True
|
||||
self.keep_ids = keep_ids
|
||||
super().__init__(InteractCtrl.CtrlStep, port=port, host=host, verbose=verbose, update=update)
|
||||
|
||||
def connect(self):
|
||||
self.next_id = 1
|
||||
super().connect()
|
||||
|
||||
def send(self, data):
|
||||
data = Ctrl().add_header(data)
|
||||
return self.socket.send(data) == len(data)
|
||||
|
||||
def receive(self):
|
||||
responses = []
|
||||
data = self.socket.recv(4096)
|
||||
while (len(data)>0):
|
||||
(response_with_header, data) = IPA().split_combined(data)
|
||||
response = Ctrl().rem_header(response_with_header)
|
||||
responses.append(response.decode('utf-8'))
|
||||
return responses
|
||||
|
||||
def command(self, command):
|
||||
assert self.send(command)
|
||||
res = self.receive()
|
||||
split_responses = []
|
||||
for r in res:
|
||||
split_responses.extend(r.splitlines())
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
return split_responses
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = common_parser()
|
||||
parser_add_run_args(parser)
|
||||
args = parser.parse_args()
|
||||
|
||||
interact = InteractCtrl(args.port, args.host, verbose=False, update=False,
|
||||
keep_ids=True)
|
||||
|
||||
main_run_commands(args.run_app_str, args.output_path, args.cmd_str,
|
||||
args.cmd_files, interact)
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
|
|
@ -0,0 +1,179 @@
|
|||
#!/usr/bin/env python3
|
||||
#
|
||||
# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
|
||||
# All rights reserved.
|
||||
#
|
||||
# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Run VTY commands or test transcripts against a given application. Commandline
|
||||
invocation exposes only direct command piping, the transcript verification code
|
||||
is exposed as commandline args by osmo_verify_transcript_vty.py.
|
||||
'''
|
||||
|
||||
import re
|
||||
|
||||
from osmopy.osmo_interact_common import *
|
||||
|
||||
class InteractVty(Interact):
|
||||
|
||||
class VtyStep(Interact.StepBase):
|
||||
expect_node = None # e.g. '(config-net)'
|
||||
expect_prompt_char = None # '>' or '#'
|
||||
|
||||
def __init__(self, prompt):
|
||||
super().__init__()
|
||||
self.prompt = prompt
|
||||
|
||||
def verify_interact_state(self, interact_instance):
|
||||
if interact_instance.update:
|
||||
return
|
||||
if interact_instance.this_node != self.expect_node:
|
||||
raise Exception('Mismatch: expected VTY node %r in the prompt, got %r'
|
||||
% (self.expect_node, interact_instance.this_node))
|
||||
if interact_instance.this_prompt_char != self.expect_prompt_char:
|
||||
raise Exception('Mismatch: expected VTY prompt character %r, got %r'
|
||||
% (self.expect_prompt_char, interact_instance.this_prompt_char))
|
||||
|
||||
@staticmethod
|
||||
def is_next_step(line, interact_instance):
|
||||
m = interact_instance.re_prompt.match(line)
|
||||
if not m:
|
||||
return None
|
||||
next_step = InteractVty.VtyStep(interact_instance.prompt)
|
||||
next_step.expect_node = m.group(1)
|
||||
next_step.expect_prompt_char = m.group(2)
|
||||
next_step.command = m.group(3)
|
||||
return next_step
|
||||
|
||||
def command_str(self, interact_instance=None):
|
||||
if interact_instance is None:
|
||||
node = self.expect_node
|
||||
prompt_char = self.expect_prompt_char
|
||||
else:
|
||||
node = interact_instance.last_node
|
||||
prompt_char = interact_instance.last_prompt_char
|
||||
if node:
|
||||
node = '(%s)' % node
|
||||
node = node or ''
|
||||
return '%s%s%s %s' % (self.prompt, node, prompt_char, self.command)
|
||||
|
||||
def __init__(self, prompt, port, host, verbose, update):
|
||||
self.prompt = prompt
|
||||
super().__init__(InteractVty.VtyStep, port, host, verbose, update)
|
||||
|
||||
def connect(self):
|
||||
self.this_node = None
|
||||
self.this_prompt_char = '>' # slight cheat for initial prompt char
|
||||
self.last_node = None
|
||||
self.last_prompt_char = None
|
||||
|
||||
super().connect()
|
||||
# receive the first welcome message and discard
|
||||
data = self.socket.recv(4096)
|
||||
if not self.prompt:
|
||||
b = data
|
||||
b = b[b.rfind(b'\n') + 1:]
|
||||
while b and (b[0] < ord('A') or b[0] > ord('z')):
|
||||
b = b[1:]
|
||||
prompt_str = b.decode('utf-8')
|
||||
if '>' in prompt_str:
|
||||
self.prompt = prompt_str[:prompt_str.find('>')]
|
||||
if not self.prompt:
|
||||
raise Exception('Could not find application name; needed to decode prompts.'
|
||||
' Initial data was: %r' % data)
|
||||
self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt)
|
||||
|
||||
def _command(self, command_str, timeout=10):
|
||||
self.socket.send(command_str.encode())
|
||||
|
||||
waited_since = time.time()
|
||||
received_lines = []
|
||||
last_line = ''
|
||||
|
||||
while True:
|
||||
new_data = self.socket.recv(4096).decode('utf-8')
|
||||
|
||||
last_line = "%s%s" % (last_line, new_data)
|
||||
|
||||
if last_line:
|
||||
lines = last_line.splitlines()
|
||||
received_lines.extend(lines[:-1])
|
||||
last_line = lines[-1]
|
||||
|
||||
match = self.re_prompt.match(last_line)
|
||||
if not match:
|
||||
if time.time() - waited_since > timeout:
|
||||
raise IOError("Failed to read data (did the app crash?)")
|
||||
time.sleep(.1)
|
||||
continue
|
||||
|
||||
self.last_node = self.this_node
|
||||
self.last_prompt_char = self.this_prompt_char
|
||||
self.this_node = match.group(1) or None
|
||||
self.this_prompt_char = match.group(2)
|
||||
break
|
||||
|
||||
# expecting to have received the command we sent as echo, remove it
|
||||
clean_command_str = command_str.strip()
|
||||
if clean_command_str.endswith('?'):
|
||||
clean_command_str = clean_command_str[:-1]
|
||||
if received_lines and received_lines[0] == clean_command_str:
|
||||
received_lines = received_lines[1:]
|
||||
return received_lines
|
||||
|
||||
def command(self, command_str, timeout=10):
|
||||
command_str = command_str or '\r'
|
||||
if command_str[-1] not in '?\r\t':
|
||||
command_str = command_str + '\r'
|
||||
|
||||
received_lines = self._command(command_str, timeout)
|
||||
|
||||
if command_str[-1] == '?':
|
||||
self._command('\x03', timeout)
|
||||
|
||||
return received_lines
|
||||
|
||||
def parser_add_vty_args(parser):
|
||||
parser.add_argument('-n', '--prompt-name', dest='prompt',
|
||||
help="Name used in application's telnet VTY prompt."
|
||||
" If omitted, will attempt to determine the name from"
|
||||
" the initial VTY prompt.")
|
||||
return parser
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = common_parser()
|
||||
parser_add_vty_args(parser)
|
||||
parser_add_run_args(parser)
|
||||
parser.add_argument('-X', '--gen-xml-ref', dest='gen_xml', action='store_true',
|
||||
help="Equivalent to '-c \"show online-help\" -O -',"
|
||||
" can be used to generate the VTY reference file as"
|
||||
" required by osmo-gsm-manuals.git.")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.gen_xml:
|
||||
if args.cmd_str:
|
||||
raise Exception('It does not make sense to pass both --command and'
|
||||
' --gen-xml-ref.')
|
||||
args.cmd_str = 'show online-help'
|
||||
|
||||
interact = InteractVty(args.prompt, args.port, args.host,
|
||||
verbose=False, update=False)
|
||||
|
||||
main_run_commands(args.run_app_str, args.output_path, args.cmd_str,
|
||||
args.cmd_files, interact)
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
|
|
@ -40,70 +40,11 @@ This module can either be run directly to run or update a given CTRL transcript,
|
|||
or it can be imported as a module to run more complex setups.
|
||||
'''
|
||||
|
||||
import re
|
||||
|
||||
from osmopy.osmo_verify_transcript_common import *
|
||||
from osmopy.osmo_ipa import Ctrl, IPA
|
||||
|
||||
class InteractCtrl(Interact):
|
||||
next_id = 1
|
||||
keep_ids = True
|
||||
re_command = re.compile('^(SET|GET) ([^ ]*) (.*)$')
|
||||
|
||||
class CtrlStep(Interact.StepBase):
|
||||
|
||||
@staticmethod
|
||||
def is_next_step(line, interact_instance):
|
||||
m = InteractCtrl.re_command.match(line)
|
||||
if not m:
|
||||
return None
|
||||
next_step = InteractCtrl.CtrlStep()
|
||||
|
||||
set_get = m.group(1)
|
||||
cmd_id = m.group(2)
|
||||
var_val = m.group(3)
|
||||
if not interact_instance.keep_ids:
|
||||
cmd_id = interact_instance.next_id
|
||||
interact_instance.next_id += 1
|
||||
next_step.command = '%s %s %s' % (set_get, cmd_id, var_val)
|
||||
|
||||
return next_step
|
||||
|
||||
def __init__(self, port, host, verbose=False, update=False, keep_ids=True):
|
||||
if not update:
|
||||
keep_ids = True
|
||||
self.keep_ids = keep_ids
|
||||
super().__init__(InteractCtrl.CtrlStep, port=port, host=host, verbose=verbose, update=update)
|
||||
|
||||
def connect(self):
|
||||
self.next_id = 1
|
||||
super().connect()
|
||||
|
||||
def send(self, data):
|
||||
data = Ctrl().add_header(data)
|
||||
return self.socket.send(data) == len(data)
|
||||
|
||||
def receive(self):
|
||||
responses = []
|
||||
data = self.socket.recv(4096)
|
||||
while (len(data)>0):
|
||||
(response_with_header, data) = IPA().split_combined(data)
|
||||
response = Ctrl().rem_header(response_with_header)
|
||||
responses.append(response.decode('utf-8'))
|
||||
return responses
|
||||
|
||||
def command(self, command):
|
||||
assert self.send(command)
|
||||
res = self.receive()
|
||||
split_responses = []
|
||||
for r in res:
|
||||
split_responses.extend(r.splitlines())
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
return split_responses
|
||||
from osmopy.osmo_interact_ctrl import *
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = common_parser()
|
||||
parser_add_verify_args(parser)
|
||||
parser.add_argument('-i', '--keep-ids', dest='keep_ids', action='store_true',
|
||||
help='With --update, default is to overwrite the command IDs'
|
||||
' so that they are consecutive numbers starting from 1.'
|
||||
|
@ -112,7 +53,6 @@ if __name__ == '__main__':
|
|||
|
||||
interact = InteractCtrl(args.port, args.host, args.verbose, args.update, args.keep_ids)
|
||||
|
||||
main(args.run_app_str, args.output_path, args.cmd_str,
|
||||
args.transcript_files, interact, args.verbose)
|
||||
main_verify_transcripts(args.run_app_str, args.transcript_files, interact, args.verbose)
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
|
||||
|
|
|
@ -52,148 +52,16 @@ or it can be imported as a module to run more complex setups.
|
|||
|
||||
import re
|
||||
|
||||
from osmopy.osmo_verify_transcript_common import *
|
||||
|
||||
class InteractVty(Interact):
|
||||
|
||||
class VtyStep(Interact.StepBase):
|
||||
expect_node = None # e.g. '(config-net)'
|
||||
expect_prompt_char = None # '>' or '#'
|
||||
|
||||
def __init__(self, prompt):
|
||||
super().__init__()
|
||||
self.prompt = prompt
|
||||
|
||||
def verify_interact_state(self, interact_instance):
|
||||
if interact_instance.update:
|
||||
return
|
||||
if interact_instance.this_node != self.expect_node:
|
||||
raise Exception('Mismatch: expected VTY node %r in the prompt, got %r'
|
||||
% (self.expect_node, interact_instance.this_node))
|
||||
if interact_instance.this_prompt_char != self.expect_prompt_char:
|
||||
raise Exception('Mismatch: expected VTY prompt character %r, got %r'
|
||||
% (self.expect_prompt_char, interact_instance.this_prompt_char))
|
||||
|
||||
@staticmethod
|
||||
def is_next_step(line, interact_instance):
|
||||
m = interact_instance.re_prompt.match(line)
|
||||
if not m:
|
||||
return None
|
||||
next_step = InteractVty.VtyStep(interact_instance.prompt)
|
||||
next_step.expect_node = m.group(1)
|
||||
next_step.expect_prompt_char = m.group(2)
|
||||
next_step.command = m.group(3)
|
||||
return next_step
|
||||
|
||||
def command_str(self, interact_instance=None):
|
||||
if interact_instance is None:
|
||||
node = self.expect_node
|
||||
prompt_char = self.expect_prompt_char
|
||||
else:
|
||||
node = interact_instance.last_node
|
||||
prompt_char = interact_instance.last_prompt_char
|
||||
if node:
|
||||
node = '(%s)' % node
|
||||
node = node or ''
|
||||
return '%s%s%s %s' % (self.prompt, node, prompt_char, self.command)
|
||||
|
||||
def __init__(self, prompt, port, host, verbose, update):
|
||||
self.prompt = prompt
|
||||
super().__init__(InteractVty.VtyStep, port, host, verbose, update)
|
||||
|
||||
def connect(self):
|
||||
self.this_node = None
|
||||
self.this_prompt_char = '>' # slight cheat for initial prompt char
|
||||
self.last_node = None
|
||||
self.last_prompt_char = None
|
||||
|
||||
super().connect()
|
||||
# receive the first welcome message and discard
|
||||
data = self.socket.recv(4096)
|
||||
if not self.prompt:
|
||||
b = data
|
||||
b = b[b.rfind(b'\n') + 1:]
|
||||
while b and (b[0] < ord('A') or b[0] > ord('z')):
|
||||
b = b[1:]
|
||||
prompt_str = b.decode('utf-8')
|
||||
if '>' in prompt_str:
|
||||
self.prompt = prompt_str[:prompt_str.find('>')]
|
||||
if not self.prompt:
|
||||
raise Exception('Could not find application name; needed to decode prompts.'
|
||||
' Initial data was: %r' % data)
|
||||
self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt)
|
||||
|
||||
def _command(self, command_str, timeout=10):
|
||||
self.socket.send(command_str.encode())
|
||||
|
||||
waited_since = time.time()
|
||||
received_lines = []
|
||||
last_line = ''
|
||||
|
||||
while True:
|
||||
new_data = self.socket.recv(4096).decode('utf-8')
|
||||
|
||||
last_line = "%s%s" % (last_line, new_data)
|
||||
|
||||
if last_line:
|
||||
lines = last_line.splitlines()
|
||||
received_lines.extend(lines[:-1])
|
||||
last_line = lines[-1]
|
||||
|
||||
match = self.re_prompt.match(last_line)
|
||||
if not match:
|
||||
if time.time() - waited_since > timeout:
|
||||
raise IOError("Failed to read data (did the app crash?)")
|
||||
time.sleep(.1)
|
||||
continue
|
||||
|
||||
self.last_node = self.this_node
|
||||
self.last_prompt_char = self.this_prompt_char
|
||||
self.this_node = match.group(1) or None
|
||||
self.this_prompt_char = match.group(2)
|
||||
break
|
||||
|
||||
# expecting to have received the command we sent as echo, remove it
|
||||
clean_command_str = command_str.strip()
|
||||
if clean_command_str.endswith('?'):
|
||||
clean_command_str = clean_command_str[:-1]
|
||||
if received_lines and received_lines[0] == clean_command_str:
|
||||
received_lines = received_lines[1:]
|
||||
return received_lines
|
||||
|
||||
def command(self, command_str, timeout=10):
|
||||
command_str = command_str or '\r'
|
||||
if command_str[-1] not in '?\r\t':
|
||||
command_str = command_str + '\r'
|
||||
|
||||
received_lines = self._command(command_str, timeout)
|
||||
|
||||
if command_str[-1] == '?':
|
||||
self._command('\x03', timeout)
|
||||
|
||||
return received_lines
|
||||
from osmopy.osmo_interact_vty import *
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = common_parser()
|
||||
parser.add_argument('-n', '--prompt-name', dest='prompt',
|
||||
help="Name used in application's telnet VTY prompt."
|
||||
" If omitted, will attempt to determine the name from"
|
||||
" the initial VTY prompt.")
|
||||
parser.add_argument('-X', '--gen-xml-ref', dest='gen_xml', action='store_true',
|
||||
help="Equivalent to '-c \"show online-help\" -O -',"
|
||||
" can be used to generate the VTY reference file as"
|
||||
" required by osmo-gsm-manuals.git.")
|
||||
parser_add_vty_args(parser)
|
||||
parser_add_verify_args(parser)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.gen_xml:
|
||||
if args.cmd_str:
|
||||
raise Exception('It does not make sense to pass both --command and'
|
||||
' --gen-xml-ref.')
|
||||
args.cmd_str = 'show online-help'
|
||||
|
||||
interact = InteractVty(args.prompt, args.port, args.host, args.verbose, args.update)
|
||||
|
||||
main(args.run_app_str, args.output_path, args.cmd_str,
|
||||
args.transcript_files, interact, args.verbose)
|
||||
main_verify_transcripts(args.run_app_str, args.transcript_files, interact, args.verbose)
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
|
||||
|
|
4
setup.py
4
setup.py
|
@ -23,7 +23,9 @@ if sys.version_info.major == 2:
|
|||
scripts = ["osmopy/osmodumpdoc.py", "osmopy/osmotestconfig.py",
|
||||
"osmopy/osmotestvty.py"]
|
||||
elif sys.version_info.major == 3:
|
||||
scripts = ["osmopy/osmo_verify_transcript_vty.py",
|
||||
scripts = ["osmopy/osmo_interact_vty.py",
|
||||
"osmopy/osmo_interact_ctrl.py",
|
||||
"osmopy/osmo_verify_transcript_vty.py",
|
||||
"osmopy/osmo_verify_transcript_ctrl.py"]
|
||||
|
||||
setup(
|
||||
|
|
Loading…
Reference in New Issue