diff --git a/osmopy/__init__.py b/osmopy/__init__.py index e211108..01ff3de 100644 --- a/osmopy/__init__.py +++ b/osmopy/__init__.py @@ -3,4 +3,7 @@ __version__ = '0.0.1' __all__ = ['obscvty', 'osmodumpdoc', 'osmotestconfig', 'osmotestvty', 'osmoutil', - 'osmo_ipa',] + 'osmo_ipa', + 'osmo_verify_transcript_common', + 'osmo_verify_transcript_vty', + 'osmo_verify_transcript_ctrl'] diff --git a/osmopy/osmo_verify_transcript_common.py b/osmopy/osmo_verify_transcript_common.py new file mode 100644 index 0000000..e4b5553 --- /dev/null +++ b/osmopy/osmo_verify_transcript_common.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python3 +# +# (C) 2017 by sysmocom s.f.m.c. GmbH +# All rights reserved. +# +# Author: Neels Hofmeyr +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +''' +Common code for verify_transcript_vty.py and verify_transcript_ctrl.py. +''' + +import argparse +import sys +import os +import subprocess +import time +import traceback +import socket +import shlex + + +class Interact: + + class StepBase: + command = None + result = None + leading_blanks = None + + def __init__(self): + self.result = [] + + def verify_interact_state(self, interact_instance): + # for example to verify that the last VTY prompt received shows the + # right node. + pass + + def command_str(self, interact_instance=None): + return self.command + + def __str__(self): + return '%s\n%s' % (self.command_str(), '\n'.join(self.result)) + + @staticmethod + def is_next_step(line, interact_instance): + assert not "implemented by InteractVty.VtyStep and InteractCtrl.CtrlStep" + + socket = None + + def __init__(self, step_class, port, host, verbose=False, update=False): + ''' + host is the hostname to connect to. + port is the CTRL port to connect on. + ''' + self.Step = step_class + self.port = port + self.host = host + self.verbose = verbose + self.update = update + + def connect(self): + assert self.socket is None + retries = 30 + took = 0 + while True: + took += 1 + try: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setblocking(1) + self.socket.connect((self.host, int(self.port))) + except IOError: + retries -= 1 + if retries <= 0: + raise + time.sleep(.1) + continue + break + + def close(self): + if self.socket is None: + return + self.socket.close() + self.socket = None + + def command(self, command): + assert not "implemented separately by InteractVty and InteractCtrl" + + def verify_transcript_file(self, transcript_file): + with open(transcript_file, 'r') as f: + content = f.read() + + try: + result = self.verify_transcript(content) + except: + print('Error while verifying transcript file %r' % transcript_file, file=sys.stderr) + sys.stderr.flush() + raise + + if not self.update: + return + content = '\n'.join(result) + with open(transcript_file, 'w') as f: + f.write(content) + + def verify_transcript(self, transcript): + '''' + transcript is a "screenshot" of a session, a multi-line string + including commands and expected results. + Feed commands to self.command() and verify the expected results. + ''' + + # parse steps + steps = [] + step = None + blank_lines = 0 + for line in transcript.splitlines(): + if not line: + blank_lines += 1 + continue + next_step_started = self.Step.is_next_step(line, self) + if next_step_started: + if step: + steps.append(step) + step = next_step_started + step.leading_blanks = blank_lines + blank_lines = 0 + elif step: + # we only count blank lines directly preceding the start of a + # next step. Insert blank lines in the middle of a response + # back into the response: + if blank_lines: + step.result.extend([''] * blank_lines) + blank_lines = 0 + step.result.append(line) + if step: + steps.append(step) + step = None + + actual_result = [] + + # run steps + step_nr = 0 + for step in steps: + step_nr += 1 + try: + if self.verbose: + if step.leading_blanks: + print('\n' * step.leading_blanks, end='') + print(step.command_str()) + sys.stdout.flush() + + step.verify_interact_state(self) + + res = self.command(step.command) + + if self.verbose: + sys.stderr.flush() + sys.stdout.flush() + print('\n'.join(res)) + sys.stdout.flush() + + if step.leading_blanks: + actual_result.extend([''] * step.leading_blanks) + actual_result.append(step.command_str(self)) + + match_result = self.match_lines(step.result, res) + + if self.update: + if match_result is True: + # preserve any wildcards + actual_result.extend(step.result) + else: + # mismatch, take exactly what came in + actual_result.extend(res) + continue + if match_result is not True: + raise Exception('Result mismatch:\n%s\n\nExpected:\n[\n%s\n]\n\nGot:\n[\n%s\n%s\n]' + % (match_result, step, step.command_str(), '\n'.join(res))) + except: + print('Error during transcript step %d:\n[\n%s\n]' % (step_nr, step), + file=sys.stderr) + sys.stderr.flush() + raise + + # final line ending + actual_result.append('') + return actual_result + + @staticmethod + def match_lines(expect, got): + ''' + Match two lists of strings, allowing certain wildcards: + - In 'expect', if a line is exactly '...', it matches any number of + arbitrary lines in 'got'; the implementation is trivial and skips + lines to the first occurence in 'got' that continues after '...'. + + Return 'True' on match, or a string describing the mismatch. + ''' + def match_line(expect_line, got_line): + return expect_line == got_line + + e = 0 + g = 0 + while e < len(expect): + if expect[e] == '...': + e += 1 + + if e >= len(expect): + # anything left in 'got' is accepted. + return True + + # look for the next occurence of the expected line in 'got' + while g < len(got) and not match_line(expect[e], got[g]): + g += 1 + continue + + if g >= len(got): + return 'Cannot find line %r' % expect[e] + + if not match_line(expect[e], got[g]): + return 'Mismatch:\nExpect:\n%r\nGot:\n%r' % (expect[e], got[g]) + + e += 1 + g += 1 + + if g < len(got): + return 'Did not expect line %r' % got[g] + return True + +def end_process(proc): + if not proc: + return + + rc = proc.poll() + if rc is not None: + print('Process has already terminated with', rc) + proc.wait() + return + + proc.terminate() + time_to_wait_for_term = 5 + wait_step = 0.001 + waited_time = 0 + while True: + # poll returns None if proc is still running + if proc.poll() is not None: + break + waited_time += wait_step + # make wait_step approach 1.0 + wait_step = (1. + 5. * wait_step) / 6. + if waited_time >= time_to_wait_for_term: + break + time.sleep(wait_step) + + if proc.poll() is None: + # termination seems to be slower than that, let's just kill + proc.kill() + print("Killed child process") + elif waited_time > .002: + print("Terminating took %.3fs" % waited_time) + proc.wait() + +class Application: + proc = None + _devnull = None + + @staticmethod + def devnull(): + if Application._devnull is None: + Application._devnull = open(os.devnull, 'w') + return Application._devnull + + def __init__(self, command_tuple, purge_output=True): + self.command_tuple = command_tuple + self.purge_output = purge_output + + def run(self): + out_err = None + if self.purge_output: + out_err = Application.devnull() + + print('Launching: cd %r; %s' % (os.getcwd(), ' '.join(self.command_tuple))) + self.proc = subprocess.Popen(self.command_tuple, stdout=out_err, stderr=out_err) + + def stop(self): + end_process(self.proc) + +def verify_application(command_tuple, interact, transcript_file, verbose): + passed = None + application = None + + sys.stdout.flush() + sys.stderr.flush() + + if command_tuple: + application = Application(command_tuple, purge_output=not verbose) + application.run() + + try: + interact.connect() + interact.verify_transcript_file(transcript_file) + passed = True + except: + traceback.print_exc() + passed = False + interact.close() + + if application: + application.stop() + + sys.stdout.flush() + sys.stderr.flush() + + return passed + +def common_parser(): + parser = argparse.ArgumentParser() + parser.add_argument('-r', '--run', dest='command_str', + help='command to run to launch application to test,' + ' including command line arguments. If omitted, no' + ' application is launched.') + parser.add_argument('-p', '--port', dest='port', + help="Port that the application opens.") + parser.add_argument('-H', '--host', dest='host', default='localhost', + help="Host that the application opens the port on.") + parser.add_argument('-u', '--update', dest='update', action='store_true', + help='Do not verify, but OVERWRITE transcripts based on' + ' the applications current behavior. OVERWRITES TRANSCRIPT' + ' FILES.') + parser.add_argument('-v', '--verbose', action='store_true', + help='Print commands and application output') + parser.add_argument('transcript_files', nargs='*', help='transcript files to verify') + return parser + +def main(command_str, transcript_files, interact, verbose): + + if command_str: + command_tuple = shlex.split(command_str) + else: + command_tuple = None + + results = [] + for t in transcript_files: + passed = verify_application(command_tuple=command_tuple, + interact=interact, + transcript_file=t, + verbose=verbose) + results.append((passed, t)) + + print('\nRESULTS:') + all_passed = True + for passed, t in results: + print('%s: %s' % ('pass' if passed else 'FAIL', t)) + all_passed = all_passed and passed + print() + + if not all_passed: + sys.exit(1) + +# vim: tabstop=4 shiftwidth=4 expandtab nocin ai diff --git a/osmopy/osmo_verify_transcript_ctrl.py b/osmopy/osmo_verify_transcript_ctrl.py new file mode 100755 index 0000000..7afe4bf --- /dev/null +++ b/osmopy/osmo_verify_transcript_ctrl.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +# +# (C) 2017 by sysmocom s.f.m.c. GmbH +# All rights reserved. +# +# Author: Neels Hofmeyr +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +''' +Run CTRL test transcripts against a given application. + +A CTRL transcript contains CTRL commands and their expected results. +It looks like: + +" +SET 1 var val +SET_REPLY 1 var OK +GET 2 var +GET_REPLY 2 var val +" + +The application to be tested is described by +- a binary to run, +- command line arguments to pass to the binary, +- the CTRL port. + +This module can either be run directly to run or update a given CTRL transcript, +or it can be imported as a module to run more complex setups. +''' + +import re + +from osmopy.osmo_verify_transcript_common import * +from osmopy.osmo_ipa import Ctrl, IPA + +class InteractCtrl(Interact): + next_id = 1 + keep_ids = True + re_command = re.compile('^(SET|GET) ([^ ]*) (.*)$') + + class CtrlStep(Interact.StepBase): + + @staticmethod + def is_next_step(line, interact_instance): + m = InteractCtrl.re_command.match(line) + if not m: + return None + next_step = InteractCtrl.CtrlStep() + + set_get = m.group(1) + cmd_id = m.group(2) + var_val = m.group(3) + if not interact_instance.keep_ids: + cmd_id = interact_instance.next_id + interact_instance.next_id += 1 + next_step.command = '%s %s %s' % (set_get, cmd_id, var_val) + + return next_step + + def __init__(self, port, host, verbose=False, update=False, keep_ids=True): + if not update: + keep_ids = True + self.keep_ids = keep_ids + super().__init__(InteractCtrl.CtrlStep, port=port, host=host, verbose=verbose, update=update) + + def connect(self): + self.next_id = 1 + super().connect() + + def send(self, data): + data = Ctrl().add_header(data) + return self.socket.send(data) == len(data) + + def receive(self): + responses = [] + data = self.socket.recv(4096) + while (len(data)>0): + (response_with_header, data) = IPA().split_combined(data) + response = Ctrl().rem_header(response_with_header) + responses.append(response.decode('utf-8')) + return responses + + def command(self, command): + assert self.send(command) + res = self.receive() + split_responses = [] + for r in res: + split_responses.extend(r.splitlines()) + sys.stdout.flush() + sys.stderr.flush() + return split_responses + +if __name__ == '__main__': + parser = common_parser() + parser.add_argument('-i', '--keep-ids', dest='keep_ids', action='store_true', + help='With --update, default is to overwrite the command IDs' + ' so that they are consecutive numbers starting from 1.' + ' With --keep-ids, do not change these command IDs.') + args = parser.parse_args() + + interact = InteractCtrl(args.port, args.host, args.verbose, args.update, args.keep_ids) + + main(command_str=args.command_str, + transcript_files=args.transcript_files, + interact=interact, + verbose=args.verbose) + +# vim: tabstop=4 shiftwidth=4 expandtab nocin ai diff --git a/osmopy/osmo_verify_transcript_vty.py b/osmopy/osmo_verify_transcript_vty.py new file mode 100755 index 0000000..3a4d630 --- /dev/null +++ b/osmopy/osmo_verify_transcript_vty.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +# +# (C) 2017 by sysmocom s.f.m.c. GmbH +# All rights reserved. +# +# Author: Neels Hofmeyr +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +''' +Run VTY test transcripts against a given application. + +A VTY transcript contains VTY commands and their expected results. +It looks like: + +" +OsmoHLR> enable + +OsmoHLR# subscriber show imsi 123456789023000 +% No subscriber for imsi = '123456789023000' +OsmoHLR# subscriber show msisdn 12345 +% No subscriber for msisdn = '12345' + +OsmoHLR# subscriber create imsi 123456789023000 +% Created subscriber 123456789023000 + ID: 1 + IMSI: 123456789023000 + MSISDN: none + No auth data +" + +The application to be tested is described by +- a binary to run, +- command line arguments to pass to the binary, +- the VTY telnet port, +- the application name as printed in the VTY prompt. + +This module can either be run directly to run or update a given VTY transcript, +or it can be imported as a module to run more complex setups. +''' + +import re + +from osmopy.osmo_verify_transcript_common import * + +class InteractVty(Interact): + + class VtyStep(Interact.StepBase): + expect_node = None # e.g. '(config-net)' + expect_prompt_char = None # '>' or '#' + + def __init__(self, prompt): + super().__init__() + self.prompt = prompt + + def verify_interact_state(self, interact_instance): + if interact_instance.update: + return + if interact_instance.this_node != self.expect_node: + raise Exception('Mismatch: expected VTY node %r in the prompt, got %r' + % (self.expect_node, interact_instance.this_node)) + if interact_instance.this_prompt_char != self.expect_prompt_char: + raise Exception('Mismatch: expected VTY prompt character %r, got %r' + % (self.expect_prompt_char, interact_instance.this_prompt_char)) + + @staticmethod + def is_next_step(line, interact_instance): + m = interact_instance.re_prompt.match(line) + if not m: + return None + next_step = InteractVty.VtyStep(interact_instance.prompt) + next_step.expect_node = m.group(1) + next_step.expect_prompt_char = m.group(2) + next_step.command = m.group(3) + return next_step + + def command_str(self, interact_instance=None): + if interact_instance is None: + node = self.expect_node + prompt_char = self.expect_prompt_char + else: + node = interact_instance.last_node + prompt_char = interact_instance.last_prompt_char + if node: + node = '(%s)' % node + node = node or '' + return '%s%s%s %s' % (self.prompt, node, prompt_char, self.command) + + def __init__(self, prompt, port, host, verbose, update): + self.prompt = prompt + self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt) + super().__init__(InteractVty.VtyStep, port, host, verbose, update) + + def connect(self): + self.this_node = None + self.this_prompt_char = '>' # slight cheat for initial prompt char + self.last_node = None + self.last_prompt_char = None + + super().connect() + # receive the first welcome message and discard + self.socket.recv(4096) + + def _command(self, command_str, timeout=10): + self.socket.send(command_str.encode()) + + waited_since = time.time() + received_lines = [] + last_line = '' + + while True: + new_data = self.socket.recv(4096).decode('utf-8') + + last_line = "%s%s" % (last_line, new_data) + + if last_line: + lines = last_line.splitlines() + received_lines.extend(lines[:-1]) + last_line = lines[-1] + + match = self.re_prompt.match(last_line) + if not match: + if time.time() - waited_since > timeout: + raise IOError("Failed to read data (did the app crash?)") + time.sleep(.1) + continue + + self.last_node = self.this_node + self.last_prompt_char = self.this_prompt_char + self.this_node = match.group(1) or None + self.this_prompt_char = match.group(2) + break + + # expecting to have received the command we sent as echo, remove it + clean_command_str = command_str.strip() + if clean_command_str.endswith('?'): + clean_command_str = clean_command_str[:-1] + if received_lines and received_lines[0] == clean_command_str: + received_lines = received_lines[1:] + return received_lines + + def command(self, command_str, timeout=10): + command_str = command_str or '\r' + if command_str[-1] not in '?\r\t': + command_str = command_str + '\r' + + received_lines = self._command(command_str, timeout) + + if command_str[-1] == '?': + self._command('\x03', timeout) + + return received_lines + +if __name__ == '__main__': + parser = common_parser() + parser.add_argument('-n', '--prompt-name', dest='prompt', + help="Name used in application's telnet VTY prompt.") + args = parser.parse_args() + + interact = InteractVty(args.prompt, args.port, args.host, args.verbose, args.update) + + main(command_str=args.command_str, + transcript_files=args.transcript_files, + interact=interact, + verbose=args.verbose) + +# vim: tabstop=4 shiftwidth=4 expandtab nocin ai diff --git a/setup.py b/setup.py index 44e7fd1..9c25a0d 100755 --- a/setup.py +++ b/setup.py @@ -23,7 +23,9 @@ setup( version = __version__, packages = ["osmopy"], scripts = ["osmopy/osmodumpdoc.py", "osmopy/osmotestconfig.py", - "osmopy/osmotestvty.py"], + "osmopy/osmotestvty.py", + "osmopy/osmo_verify_transcript_vty.py", + "osmopy/osmo_verify_transcript_ctrl.py"], license = "AGPLv3", description = "Osmopython: osmocom testing scripts", author = "Katerina Barone-Adesi",