480 lines
16 KiB
Python
480 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
|
|
# All rights reserved.
|
|
#
|
|
# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
'''
|
|
Common code for VTY and CTRL interface interaction and transcript verification.
|
|
This implements all of application interaction, piping and verification.
|
|
vty.py and ctrl.py plug VTY and CTRL interface specific bits.
|
|
'''
|
|
|
|
# Our setup.py currently wants everything to be parsable by both py2 and py3.
|
|
# IMHO that is not a good idea, but until that changes, let's just keep this
|
|
# py2 legacy shim in here so we can syntax-check this py3 module with py2.
|
|
from __future__ import print_function
|
|
|
|
import argparse
|
|
import sys
|
|
import os
|
|
import subprocess
|
|
import time
|
|
import traceback
|
|
import socket
|
|
import shlex
|
|
import re
|
|
|
|
|
|
class Interact:
|
|
|
|
class StepBase:
|
|
command = None
|
|
result = None
|
|
leading_blanks = None
|
|
|
|
def __init__(self):
|
|
self.result = []
|
|
|
|
def verify_interact_state(self, interact_instance):
|
|
# for example to verify that the last VTY prompt received shows the
|
|
# right node.
|
|
pass
|
|
|
|
def command_str(self, interact_instance=None):
|
|
return self.command
|
|
|
|
def __str__(self):
|
|
return '%s\n%s' % (self.command_str(), '\n'.join(self.result))
|
|
|
|
@staticmethod
|
|
def is_next_step(line, interact_instance):
|
|
assert not "implemented by InteractVty.VtyStep and InteractCtrl.CtrlStep"
|
|
|
|
socket = None
|
|
|
|
def __init__(self, step_class, port, host, verbose=False, update=False):
|
|
'''
|
|
host is the hostname to connect to.
|
|
port is the CTRL port to connect on.
|
|
'''
|
|
self.Step = step_class
|
|
self.port = port
|
|
self.host = host
|
|
self.verbose = verbose
|
|
self.update = update
|
|
|
|
if not port:
|
|
raise Exception("You need to provide port number to connect to")
|
|
|
|
def connect(self):
|
|
assert self.socket is None
|
|
retries = 30
|
|
took = 0
|
|
while True:
|
|
took += 1
|
|
try:
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.socket.setblocking(1)
|
|
self.socket.connect((self.host, int(self.port)))
|
|
except IOError:
|
|
retries -= 1
|
|
if retries <= 0:
|
|
raise
|
|
time.sleep(.1)
|
|
continue
|
|
break
|
|
|
|
def close(self):
|
|
if self.socket is None:
|
|
return
|
|
self.socket.close()
|
|
self.socket = None
|
|
|
|
def command(self, command):
|
|
assert not "implemented separately by InteractVty and InteractCtrl"
|
|
|
|
def verify_transcript_file(self, transcript_file):
|
|
with open(transcript_file, 'r') as f:
|
|
content = f.read()
|
|
|
|
try:
|
|
result = self.verify_transcript(content)
|
|
except:
|
|
print('Error while verifying transcript file %r' % transcript_file, file=sys.stderr)
|
|
sys.stderr.flush()
|
|
raise
|
|
|
|
if not self.update:
|
|
return
|
|
content = '\n'.join(result)
|
|
with open(transcript_file, 'w') as f:
|
|
f.write(content)
|
|
|
|
def verify_transcript(self, transcript):
|
|
''''
|
|
transcript is a "screenshot" of a session, a multi-line string
|
|
including commands and expected results.
|
|
Feed commands to self.command() and verify the expected results.
|
|
'''
|
|
|
|
# parse steps
|
|
steps = []
|
|
step = None
|
|
blank_lines = 0
|
|
for line in transcript.splitlines():
|
|
if not line:
|
|
blank_lines += 1
|
|
continue
|
|
next_step_started = self.Step.is_next_step(line, self)
|
|
if next_step_started:
|
|
if step:
|
|
steps.append(step)
|
|
step = next_step_started
|
|
step.leading_blanks = blank_lines
|
|
blank_lines = 0
|
|
elif step:
|
|
# we only count blank lines directly preceding the start of a
|
|
# next step. Insert blank lines in the middle of a response
|
|
# back into the response:
|
|
if blank_lines:
|
|
step.result.extend([''] * blank_lines)
|
|
blank_lines = 0
|
|
step.result.append(line)
|
|
if step:
|
|
steps.append(step)
|
|
step = None
|
|
|
|
actual_result = []
|
|
|
|
# run steps
|
|
step_nr = 0
|
|
for step in steps:
|
|
step_nr += 1
|
|
try:
|
|
if self.verbose:
|
|
if step.leading_blanks:
|
|
print('\n' * step.leading_blanks, end='')
|
|
print(step.command_str())
|
|
sys.stdout.flush()
|
|
|
|
step.verify_interact_state(self)
|
|
|
|
res = self.command(step.command)
|
|
# trailing empty lines in the command output cannot be preserved because we allow
|
|
# arbitrary newlines between commands. Do not even track these.
|
|
while res and not res[-1]:
|
|
res = res[:-1]
|
|
|
|
if self.verbose:
|
|
sys.stderr.flush()
|
|
sys.stdout.flush()
|
|
print('\n'.join(res))
|
|
sys.stdout.flush()
|
|
|
|
if step.leading_blanks:
|
|
actual_result.extend([''] * step.leading_blanks)
|
|
actual_result.append(step.command_str(self))
|
|
|
|
match_result = self.match_lines(step.result, res)
|
|
|
|
if self.update:
|
|
if match_result is True:
|
|
# preserve any wildcards
|
|
actual_result.extend(step.result)
|
|
else:
|
|
# mismatch, take exactly what came in
|
|
actual_result.extend(res)
|
|
continue
|
|
if match_result is not True:
|
|
raise Exception('Result mismatch:\n%s\n\nExpected:\n[\n%s\n]\n\nGot:\n[\n%s\n%s\n]'
|
|
% (match_result, step, step.command_str(), '\n'.join(res)))
|
|
except:
|
|
print('Error during transcript step %d:\n[\n%s\n]' % (step_nr, step),
|
|
file=sys.stderr)
|
|
sys.stderr.flush()
|
|
raise
|
|
|
|
# final line ending
|
|
actual_result.append('')
|
|
return actual_result
|
|
|
|
@staticmethod
|
|
def match_lines(expect, got):
|
|
'''
|
|
Match two lists of strings, allowing certain wildcards:
|
|
- In 'expect', if a line is exactly '...', it matches any number of
|
|
arbitrary lines in 'got'; the implementation is trivial and skips
|
|
lines to the first occurence in 'got' that continues after '...'.
|
|
- If an 'expect' line is '... !regex', it matches any number of
|
|
lines like '...', but the given regex must not match any of those
|
|
lines.
|
|
|
|
Return 'True' on match, or a string describing the mismatch.
|
|
'''
|
|
def match_line(expect_line, got_line):
|
|
return expect_line == got_line
|
|
|
|
ANY = '...'
|
|
ANY_EXCEPT = '... !'
|
|
|
|
e = 0
|
|
g = 0
|
|
while e < len(expect):
|
|
if expect[e] == ANY or expect[e].startswith(ANY_EXCEPT):
|
|
wildcard = expect[e]
|
|
e += 1
|
|
g_end = g
|
|
|
|
if e >= len(expect):
|
|
# anything left in 'got' is accepted.
|
|
g_end = len(got)
|
|
|
|
# look for the next occurence of the expected line in 'got'
|
|
while g_end < len(got) and not match_line(expect[e], got[g_end]):
|
|
g_end += 1
|
|
|
|
if wildcard == ANY:
|
|
# no restrictions on lines
|
|
g = g_end
|
|
|
|
elif wildcard.startswith(ANY_EXCEPT):
|
|
except_re = re.compile(wildcard[len(ANY_EXCEPT):])
|
|
while g < g_end:
|
|
if except_re.search(got[g]):
|
|
return ('Got forbidden line for wildcard %r:'
|
|
' did not expect %r in line %d of response'
|
|
% (wildcard, got[g], g))
|
|
g += 1
|
|
|
|
continue
|
|
|
|
if g >= len(got):
|
|
return 'Cannot find line %r' % expect[e]
|
|
|
|
if not match_line(expect[e], got[g]):
|
|
return 'Mismatch:\nExpect:\n%r\nGot:\n%r' % (expect[e], got[g])
|
|
|
|
e += 1
|
|
g += 1
|
|
|
|
if g < len(got):
|
|
return 'Did not expect line %r' % got[g]
|
|
return True
|
|
|
|
def feed_commands(self, output, command_strs):
|
|
for command_str in command_strs:
|
|
for command in command_str.splitlines():
|
|
res = self.command(command)
|
|
output.write('\n'.join(res))
|
|
output.write('\n')
|
|
|
|
def end_process(proc, quiet=False):
|
|
if not proc:
|
|
return
|
|
|
|
rc = proc.poll()
|
|
if rc is not None:
|
|
if not quiet:
|
|
print('Process has already terminated with', rc)
|
|
proc.wait()
|
|
return
|
|
|
|
proc.terminate()
|
|
time_to_wait_for_term = 5
|
|
wait_step = 0.001
|
|
waited_time = 0
|
|
while True:
|
|
# poll returns None if proc is still running
|
|
if proc.poll() is not None:
|
|
break
|
|
waited_time += wait_step
|
|
# make wait_step approach 1.0
|
|
wait_step = (1. + 5. * wait_step) / 6.
|
|
if waited_time >= time_to_wait_for_term:
|
|
break
|
|
time.sleep(wait_step)
|
|
|
|
if proc.poll() is None:
|
|
# termination seems to be slower than that, let's just kill
|
|
proc.kill()
|
|
if not quiet:
|
|
print("Killed child process")
|
|
elif waited_time > .002:
|
|
if not quiet:
|
|
print("Terminating took %.3fs" % waited_time)
|
|
proc.wait()
|
|
|
|
class Application:
|
|
proc = None
|
|
_devnull = None
|
|
|
|
@staticmethod
|
|
def devnull():
|
|
if Application._devnull is None:
|
|
Application._devnull = open(os.devnull, 'w')
|
|
return Application._devnull
|
|
|
|
def __init__(self, run_app_str, purge_output=True, quiet=False):
|
|
self.command_tuple = shlex.split(run_app_str)
|
|
self.purge_output = purge_output
|
|
self.quiet = quiet
|
|
|
|
def run(self):
|
|
out_err = None
|
|
if self.purge_output:
|
|
out_err = Application.devnull()
|
|
|
|
if not self.quiet:
|
|
print('Launching: cd %r; %s' % (os.getcwd(), ' '.join(self.command_tuple)))
|
|
self.proc = subprocess.Popen(self.command_tuple, stdout=out_err, stderr=out_err)
|
|
|
|
def stop(self):
|
|
end_process(self.proc, self.quiet)
|
|
|
|
def verify_application(run_app_str, interact, transcript_file, verbose):
|
|
passed = None
|
|
application = None
|
|
|
|
sys.stdout.flush()
|
|
sys.stderr.flush()
|
|
|
|
if run_app_str:
|
|
application = Application(run_app_str, purge_output=not verbose)
|
|
application.run()
|
|
|
|
try:
|
|
interact.connect()
|
|
interact.verify_transcript_file(transcript_file)
|
|
passed = True
|
|
except:
|
|
traceback.print_exc()
|
|
passed = False
|
|
interact.close()
|
|
|
|
if application:
|
|
application.stop()
|
|
|
|
sys.stdout.flush()
|
|
sys.stderr.flush()
|
|
|
|
return passed
|
|
|
|
def common_parser(doc=None):
|
|
parser = argparse.ArgumentParser(description=doc,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
parser.add_argument('-r', '--run', dest='run_app_str',
|
|
help='command to run to launch application to test,'
|
|
' including command line arguments. If omitted, no'
|
|
' application is launched.')
|
|
parser.add_argument('-p', '--port', dest='port',
|
|
help="Port to reach the application at.")
|
|
parser.add_argument('-H', '--host', dest='host', default='localhost',
|
|
help="Host to reach the application at.")
|
|
return parser
|
|
|
|
def parser_add_verify_args(parser):
|
|
parser.add_argument('-u', '--update', dest='update', action='store_true',
|
|
help='Do not verify, but OVERWRITE transcripts based on'
|
|
' the application\'s current behavior. OVERWRITES TRANSCRIPT'
|
|
' FILES.')
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help='Print commands and application output')
|
|
parser.add_argument('transcript_files', nargs='*', help='transcript file(s) to verify')
|
|
return parser
|
|
|
|
def parser_add_run_args(parser):
|
|
parser.add_argument('-O', '--output', dest='output_path',
|
|
help="Write command results to a file instead of stdout."
|
|
"('-O -' writes to stdout and is the default)")
|
|
parser.add_argument('-c', '--command', dest='cmd_str',
|
|
help="Run this command (before reading input files, if any)."
|
|
" multiple commands may be separated by ';'")
|
|
parser.add_argument('cmd_files', nargs='*', help='file(s) with plain commands to run')
|
|
return parser
|
|
|
|
def main_run_commands(run_app_str, output_path, cmd_str, cmd_files, interact):
|
|
to_stdout = False
|
|
if not output_path or output_path == '-':
|
|
to_stdout = True
|
|
output = sys.stdout
|
|
else:
|
|
output = open(output_path, 'w')
|
|
|
|
application = None
|
|
|
|
if run_app_str:
|
|
application = Application(run_app_str, quiet=to_stdout)
|
|
application.run()
|
|
|
|
try:
|
|
interact.connect()
|
|
|
|
if cmd_str:
|
|
interact.feed_commands(output, cmd_str.split(';'))
|
|
|
|
for f_path in (cmd_files or []):
|
|
with open(f_path, 'r') as f:
|
|
interact.feed_commands(output, f.read().splitlines())
|
|
|
|
if not (cmd_str or cmd_files):
|
|
while True:
|
|
line = sys.stdin.readline()
|
|
if not line:
|
|
break;
|
|
interact.feed_commands(output, line.split(';'))
|
|
except:
|
|
traceback.print_exc()
|
|
finally:
|
|
if not to_stdout:
|
|
try:
|
|
output.close()
|
|
except:
|
|
traceback.print_exc()
|
|
|
|
try:
|
|
interact.close()
|
|
except:
|
|
traceback.print_exc()
|
|
|
|
if application:
|
|
try:
|
|
application.stop()
|
|
except:
|
|
traceback.print_exc()
|
|
|
|
def main_verify_transcripts(run_app_str, transcript_files, interact, verbose):
|
|
results = []
|
|
for t in transcript_files:
|
|
passed = verify_application(run_app_str=run_app_str,
|
|
interact=interact,
|
|
transcript_file=t,
|
|
verbose=verbose)
|
|
results.append((passed, t))
|
|
|
|
print('\nRESULTS:')
|
|
all_passed = True
|
|
for passed, t in results:
|
|
print('%s: %s' % ('pass' if passed else 'FAIL', t))
|
|
all_passed = all_passed and passed
|
|
print()
|
|
|
|
if not all_passed:
|
|
sys.exit(1)
|
|
|
|
# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
|