Move all obj/ references in suite.py to testenv.py

Change-Id: If4ab39be7a97d33e82c5a34e2a10dfec38613a4e
This commit is contained in:
Pau Espin 2020-05-04 20:21:31 +02:00
parent ee217b0a18
commit aa1cbdc75a
7 changed files with 315 additions and 271 deletions

View File

@ -1,5 +1,5 @@
from osmo_gsm_tester.testenv import *
print('hello world')
print('I am %r / %r' % (suite.name(), test.name()))
print('I am %r / %r' % (suite.suite().name(), test.name()))
print('one\ntwo\nthree')

View File

@ -1,5 +1,5 @@
from osmo_gsm_tester.testenv import *
print('I am %r / %r' % (suite.name(), test.name()))
print('I am %r / %r' % (suite.suite().name(), test.name()))
assert False

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
from osmo_gsm_tester.testenv import *
print('I am %r / %r' % (suite.name(), test.name()))
print('I am %r / %r' % (suite.suite().name(), test.name()))
test.set_fail('EpicFail', 'This failure is expected')

View File

@ -21,9 +21,13 @@ import os
import sys
import time
import traceback
from .. import testenv
from . import log, util, resource
from . import log
from . import util
from . import resource
from .event_loop import MainLoop
from .. import testenv
class Test(log.Origin):
UNKNOWN = 'UNKNOWN' # matches junit 'error'
@ -51,12 +55,13 @@ class Test(log.Origin):
return self._run_dir
def run(self):
testenv_obj = None
try:
self.log_target = log.FileLogTarget(self.get_run_dir().new_child('log')).set_all_levels(log.L_DBG).style_change(trace=True)
log.large_separator(self.suite_run.trial.name(), self.suite_run.name(), self.name(), sublevel=3)
self.status = Test.UNKNOWN
self.start_timestamp = time.time()
testenv.setup(self.suite_run, self)
testenv_obj = testenv.setup(self.suite_run, self)
with self.redirect_stdout():
util.run_python_file('%s.%s' % (self.suite_run.definition.name(), self.basename),
self.path)
@ -83,6 +88,8 @@ class Test(log.Origin):
self.err('TEST RUN ABORTED: %s' % type(e).__name__)
raise
finally:
if testenv_obj:
testenv_obj.stop()
if self.log_target:
self.log_target.remove()

View File

@ -21,18 +21,12 @@ import os
import sys
import time
import pprint
from .core import config, log, util, process, schema, resource
from .core import config
from .core import log
from .core import util
from .core import schema
from .core import resource
from .core import test
from .core.event_loop import MainLoop
from .obj import nitb_osmo, hlr_osmo, mgcpgw_osmo, mgw_osmo, msc_osmo, bsc_osmo, stp_osmo, ggsn_osmo, sgsn_osmo, esme, osmocon, ms_driver, iperf3
from .obj import run_node
from .obj import epc
from .obj import enb
from .obj import bts
from .obj import ms
class Timeout(Exception):
pass
class SuiteDefinition(log.Origin):
'''A test suite reserves resources for a number of tests.
@ -74,12 +68,9 @@ class SuiteRun(log.Origin):
self.start_timestamp = None
self.duration = None
self.reserved_resources = None
self.objects_to_clean_up = None
self.test_import_modules_to_clean_up = []
self._resource_requirements = None
self._resource_modifiers = None
self._config = None
self._processes = []
self._run_dir = None
self.trial = trial
self.definition = suite_definition
@ -93,40 +84,6 @@ class SuiteRun(log.Origin):
for test_basename in self.definition.test_basenames:
self.tests.append(test.Test(self, test_basename))
def register_for_cleanup(self, *obj):
assert all([hasattr(o, 'cleanup') for o in obj])
self.objects_to_clean_up = self.objects_to_clean_up or []
self.objects_to_clean_up.extend(obj)
def objects_cleanup(self):
while self.objects_to_clean_up:
obj = self.objects_to_clean_up.pop()
try:
obj.cleanup()
except Exception:
log.log_exn()
def test_import_modules_register_for_cleanup(self, mod):
'''
Tests are required to call this API for any module loaded from its own
lib subdir, because they are loaded in the global namespace. Otherwise
later tests importing modules with the same name will re-use an already
loaded module.
'''
if mod not in self.test_import_modules_to_clean_up:
self.dbg('registering module %r for cleanup' % mod)
self.test_import_modules_to_clean_up.append(mod)
def test_import_modules_cleanup(self):
while self.test_import_modules_to_clean_up:
mod = self.test_import_modules_to_clean_up.pop()
try:
self.dbg('Cleaning up module %r' % mod)
del sys.modules[mod.__name__]
del mod
except Exception:
log.log_exn()
def mark_start(self):
self.start_timestamp = time.time()
self.duration = 0
@ -155,11 +112,6 @@ class SuiteRun(log.Origin):
self._run_dir = util.Dir(self.trial.get_run_dir().new_dir(self.name()))
return self._run_dir
def get_test_run_dir(self):
if self.current_test:
return self.current_test.get_run_dir()
return self.get_run_dir()
def resource_requirements(self):
if self._resource_requirements is None:
self._resource_requirements = self.combined('resources')
@ -175,19 +127,24 @@ class SuiteRun(log.Origin):
self._config = self.combined('config', False)
return self._config
def resource_pool(self):
return self.resources_pool
def reserve_resources(self):
if self.reserved_resources:
raise RuntimeError('Attempt to reserve resources twice for a SuiteRun')
self.log('reserving resources in', self.resources_pool.state_dir, '...')
self.reserved_resources = self.resources_pool.reserve(self, self.resource_requirements(), self.resource_modifiers())
def get_reserved_resource(self, resource_class_str, specifics):
return self.reserved_resources.get(resource_class_str, specifics=specifics)
def run_tests(self, names=None):
suite_libdir = os.path.join(self.definition.suite_dir, 'lib')
try:
log.large_separator(self.trial.name(), self.name(), sublevel=2)
self.mark_start()
util.import_path_prepend(suite_libdir)
MainLoop.register_poll_func(self.poll)
if not self.reserved_resources:
self.reserve_resources()
for t in self.tests:
@ -196,9 +153,6 @@ class SuiteRun(log.Origin):
continue
self.current_test = t
t.run()
self.stop_processes()
self.objects_cleanup()
self.reserved_resources.put_all()
except Exception:
log.log_exn()
except BaseException as e:
@ -206,14 +160,7 @@ class SuiteRun(log.Origin):
self.err('SUITE RUN ABORTED: %s' % type(e).__name__)
raise
finally:
# if sys.exit() called from signal handler (e.g. SIGINT), SystemExit
# base exception is raised. Make sure to stop processes in this
# finally section. Resources are automatically freed with 'atexit'.
self.stop_processes()
self.objects_cleanup()
self.free_resources()
MainLoop.unregister_poll_func(self.poll)
self.test_import_modules_cleanup()
util.import_path_remove(suite_libdir)
self.duration = time.time() - self.start_timestamp
@ -245,203 +192,11 @@ class SuiteRun(log.Origin):
errors += 1
return (passed, skipped, failed, errors)
def remember_to_stop(self, process, respawn=False):
'''Ask suite to monitor and manage lifecycle of the Process object. If a
process managed by suite finishes before cleanup time, the current test
will be marked as FAIL and end immediatelly. If respwan=True, then suite
will respawn() the process instead.'''
self._processes.insert(0, (process, respawn))
def stop_processes(self):
if len(self._processes) == 0:
return
strategy = process.ParallelTerminationStrategy()
while self._processes:
proc, _ = self._processes.pop()
strategy.add_process(proc)
strategy.terminate_all()
def stop_process(self, process):
'Remove process from monitored list and stop it'
for proc_respawn in self._processes:
proc, respawn = proc_respawn
if proc == process:
self._processes.remove(proc_respawn)
proc.terminate()
def free_resources(self):
if self.reserved_resources is None:
return
self.reserved_resources.free()
def ip_address(self, specifics=None):
return self.reserved_resources.get(resource.R_IP_ADDRESS, specifics=specifics)
def nitb(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return nitb_osmo.OsmoNitb(self, ip_address)
def hlr(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return hlr_osmo.OsmoHlr(self, ip_address)
def ggsn(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return ggsn_osmo.OsmoGgsn(self, ip_address)
def sgsn(self, hlr, ggsn, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return sgsn_osmo.OsmoSgsn(self, hlr, ggsn, ip_address)
def mgcpgw(self, ip_address=None, bts_ip=None):
if ip_address is None:
ip_address = self.ip_address()
return mgcpgw_osmo.OsmoMgcpgw(self, ip_address, bts_ip)
def mgw(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return mgw_osmo.OsmoMgw(self, ip_address)
def msc(self, hlr, mgcpgw, stp, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return msc_osmo.OsmoMsc(self, hlr, mgcpgw, stp, ip_address)
def bsc(self, msc, mgw, stp, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return bsc_osmo.OsmoBsc(self, msc, mgw, stp, ip_address)
def stp(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return stp_osmo.OsmoStp(self, ip_address)
def ms_driver(self):
ms = ms_driver.MsDriver(self)
self.register_for_cleanup(ms)
return ms
def bts(self, specifics=None):
bts_obj = bts.Bts.get_instance_by_type(self, self.reserved_resources.get(resource.R_BTS, specifics=specifics))
bts_obj.set_lac(self.lac())
bts_obj.set_rac(self.rac())
bts_obj.set_cellid(self.cellid())
bts_obj.set_bvci(self.bvci())
self.register_for_cleanup(bts_obj)
return bts_obj
def modem(self, specifics=None):
conf = self.reserved_resources.get(resource.R_MODEM, specifics=specifics)
ms_obj = ms.MS.get_instance_by_type(self, conf)
self.register_for_cleanup(ms_obj)
return ms_obj
def modems(self, count):
l = []
for i in range(count):
l.append(self.modem())
return l
def all_resources(self, resource_func):
"""Returns all yielded resource."""
l = []
while True:
try:
l.append(resource_func())
except resource.NoResourceExn:
return l
def esme(self):
esme_obj = esme.Esme(self.msisdn())
self.register_for_cleanup(esme_obj)
return esme_obj
def run_node(self, specifics=None):
return run_node.RunNode.from_conf(self.reserved_resources.get(resource.R_RUN_NODE, specifics=specifics))
def enb(self, specifics=None):
enb_obj = enb.eNodeB.get_instance_by_type(self, self.reserved_resources.get(resource.R_ENB, specifics=specifics))
self.register_for_cleanup(enb_obj)
return enb_obj
def epc(self, run_node=None):
if run_node is None:
run_node = self.run_node()
epc_obj = epc.EPC.get_instance_by_type(self, run_node)
self.register_for_cleanup(epc_obj)
return epc_obj
def osmocon(self, specifics=None):
conf = self.reserved_resources.get(resource.R_OSMOCON, specifics=specifics)
osmocon_obj = osmocon.Osmocon(self, conf=conf)
self.register_for_cleanup(osmocon_obj)
return osmocon_obj
def iperf3srv(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
iperf3srv_obj = iperf3.IPerf3Server(self, ip_address)
return iperf3srv_obj
def msisdn(self):
msisdn = self.resources_pool.next_msisdn(self)
self.log('using MSISDN', msisdn)
return msisdn
def lac(self):
lac = self.resources_pool.next_lac(self)
self.log('using LAC', lac)
return lac
def rac(self):
rac = self.resources_pool.next_rac(self)
self.log('using RAC', rac)
return rac
def cellid(self):
cellid = self.resources_pool.next_cellid(self)
self.log('using CellId', cellid)
return cellid
def bvci(self):
bvci = self.resources_pool.next_bvci(self)
self.log('using BVCI', bvci)
return bvci
def poll(self):
for proc, respawn in self._processes:
if proc.terminated():
if respawn == True:
proc.respawn()
else:
proc.log_stdout_tail()
proc.log_stderr_tail()
log.ctx(proc)
raise log.Error('Process ended prematurely: %s' % proc.name())
def prompt(self, *msgs, **msg_details):
'ask for user interaction. Do not use in tests that should run automatically!'
if msg_details:
msgs = list(msgs)
msgs.append('{%s}' %
(', '.join(['%s=%r' % (k,v)
for k,v in sorted(msg_details.items())])))
msg = ' '.join(msgs) or 'Hit Enter to continue'
self.log('prompt:', msg)
sys.__stdout__.write('\n\n--- PROMPT ---\n')
sys.__stdout__.write(msg)
sys.__stdout__.write('\n')
sys.__stdout__.flush()
entered = util.input_polling('> ', MainLoop.poll)
self.log('prompt entered:', repr(entered))
return entered
def resource_status_str(self):
return '\n'.join(('',
'SUITE RUN: %s' % self.origin_id(),

View File

@ -20,6 +20,22 @@
# These will be initialized before each test run.
# A test script can thus establish its context by doing:
# from osmo_gsm_tester.testenv import *
import sys
from .core import process
from .core import log as log_module
from .core import process as process_module
from .core import resource
from .core.event_loop import MainLoop
from .obj import nitb_osmo, hlr_osmo, mgcpgw_osmo, mgw_osmo, msc_osmo, bsc_osmo, stp_osmo, ggsn_osmo, sgsn_osmo, esme, osmocon, ms_driver, iperf3
from .obj import run_node
from .obj import epc
from .obj import enb
from .obj import bts
from .obj import ms
trial = None
suite = None
test = None
@ -32,22 +48,284 @@ wait_no_raise = None
sleep = None
poll = None
prompt = None
Timeout = None
Sms = None
process = None
class Timeout(Exception):
pass
class TestEnv(log_module.Origin):
def __init__(self, suite_run, test):
super().__init__(log_module.C_TST, test.name())
self.suite_run = suite_run
self._test = test
self.trial = suite_run.trial # backward compat with objects
self.resources_pool = suite_run.resource_pool() # backward compat with objects
self._processes = []
self.test_import_modules_to_clean_up = []
self.objects_to_clean_up = None
MainLoop.register_poll_func(self.poll)
def suite(self):
return self.suite_run
# backward compat with objects
def get_test_run_dir(self):
return self._test.get_run_dir()
# backward compat with objects
def config(self):
return self.suite_run.config()
def remember_to_stop(self, process, respawn=False):
'''Ask suite to monitor and manage lifecycle of the Process object. If a
process managed by suite finishes before cleanup time, the current test
will be marked as FAIL and end immediatelly. If respwan=True, then suite
will respawn() the process instead.'''
self._processes.insert(0, (process, respawn))
def stop_processes(self):
if len(self._processes) == 0:
return
strategy = process_module.ParallelTerminationStrategy()
while self._processes:
proc, _ = self._processes.pop()
strategy.add_process(proc)
strategy.terminate_all()
def stop_process(self, process):
'Remove process from monitored list and stop it'
for proc_respawn in self._processes:
proc, respawn = proc_respawn
if proc == process:
self._processes.remove(proc_respawn)
proc.terminate()
def register_for_cleanup(self, *obj):
assert all([hasattr(o, 'cleanup') for o in obj])
self.objects_to_clean_up = self.objects_to_clean_up or []
self.objects_to_clean_up.extend(obj)
def objects_cleanup(self):
while self.objects_to_clean_up:
obj = self.objects_to_clean_up.pop()
try:
obj.cleanup()
except Exception:
log_module.log_exn()
def test_import_modules_register_for_cleanup(self, mod):
'''
Tests are required to call this API for any module loaded from its own
lib subdir, because they are loaded in the global namespace. Otherwise
later tests importing modules with the same name will re-use an already
loaded module.
'''
if mod not in self.test_import_modules_to_clean_up:
self.dbg('registering module %r for cleanup' % mod)
self.test_import_modules_to_clean_up.append(mod)
def test_import_modules_cleanup(self):
while self.test_import_modules_to_clean_up:
mod = self.test_import_modules_to_clean_up.pop()
try:
self.dbg('Cleaning up module %r' % mod)
del sys.modules[mod.__name__]
del mod
except Exception:
log_module.log_exn()
def poll(self):
for proc, respawn in self._processes:
if proc.terminated():
if respawn == True:
proc.respawn()
else:
proc.log_stdout_tail()
proc.log_stderr_tail()
log_module.ctx(proc)
raise log_module.Error('Process ended prematurely: %s' % proc.name())
def stop(self):
# if sys.exit() called from signal handler (e.g. SIGINT), SystemExit
# base exception is raised. Make sure to stop processes in this
# finally section. Resources are automatically freed with 'atexit'.
self.stop_processes()
self.objects_cleanup()
self.suite_run.reserved_resources.put_all()
MainLoop.unregister_poll_func(self.poll)
self.test_import_modules_cleanup()
def prompt(self, *msgs, **msg_details):
'ask for user interaction. Do not use in tests that should run automatically!'
if msg_details:
msgs = list(msgs)
msgs.append('{%s}' %
(', '.join(['%s=%r' % (k,v)
for k,v in sorted(msg_details.items())])))
msg = ' '.join(msgs) or 'Hit Enter to continue'
self.log('prompt:', msg)
sys.__stdout__.write('\n\n--- PROMPT ---\n')
sys.__stdout__.write(msg)
sys.__stdout__.write('\n')
sys.__stdout__.flush()
entered = util.input_polling('> ', MainLoop.poll)
self.log('prompt entered:', repr(entered))
return entered
def get_reserved_resource(self, resource_class_str, specifics=None):
return self.suite_run.get_reserved_resource(resource_class_str, specifics)
def ip_address(self, specifics=None):
return self.get_reserved_resource(resource.R_IP_ADDRESS, specifics)
def nitb(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return nitb_osmo.OsmoNitb(self, ip_address)
def hlr(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return hlr_osmo.OsmoHlr(self, ip_address)
def ggsn(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return ggsn_osmo.OsmoGgsn(self, ip_address)
def sgsn(self, hlr, ggsn, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return sgsn_osmo.OsmoSgsn(self, hlr, ggsn, ip_address)
def mgcpgw(self, ip_address=None, bts_ip=None):
if ip_address is None:
ip_address = self.ip_address()
return mgcpgw_osmo.OsmoMgcpgw(self, ip_address, bts_ip)
def mgw(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return mgw_osmo.OsmoMgw(self, ip_address)
def msc(self, hlr, mgcpgw, stp, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return msc_osmo.OsmoMsc(self, hlr, mgcpgw, stp, ip_address)
def bsc(self, msc, mgw, stp, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return bsc_osmo.OsmoBsc(self, msc, mgw, stp, ip_address)
def stp(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
return stp_osmo.OsmoStp(self, ip_address)
def ms_driver(self):
ms = ms_driver.MsDriver(self)
self.register_for_cleanup(ms)
return ms
def bts(self, specifics=None):
bts_obj = bts.Bts.get_instance_by_type(self, self.get_reserved_resource(resource.R_BTS, specifics=specifics))
bts_obj.set_lac(self.lac())
bts_obj.set_rac(self.rac())
bts_obj.set_cellid(self.cellid())
bts_obj.set_bvci(self.bvci())
self.register_for_cleanup(bts_obj)
return bts_obj
def modem(self, specifics=None):
conf = self.get_reserved_resource(resource.R_MODEM, specifics=specifics)
ms_obj = ms.MS.get_instance_by_type(self, conf)
self.register_for_cleanup(ms_obj)
return ms_obj
def modems(self, count):
l = []
for i in range(count):
l.append(self.modem())
return l
def all_resources(self, resource_func):
"""Returns all yielded resource."""
l = []
while True:
try:
l.append(resource_func())
except resource.NoResourceExn:
return l
def esme(self):
esme_obj = esme.Esme(self.msisdn())
self.register_for_cleanup(esme_obj)
return esme_obj
def run_node(self, specifics=None):
return run_node.RunNode.from_conf(self.get_reserved_resource(resource.R_RUN_NODE, specifics=specifics))
def enb(self, specifics=None):
enb_obj = enb.eNodeB.get_instance_by_type(self, self.get_reserved_resource(resource.R_ENB, specifics=specifics))
self.register_for_cleanup(enb_obj)
return enb_obj
def epc(self, run_node=None):
if run_node is None:
run_node = self.run_node()
epc_obj = epc.EPC.get_instance_by_type(self, run_node)
self.register_for_cleanup(epc_obj)
return epc_obj
def osmocon(self, specifics=None):
conf = self.get_reserved_resource(resource.R_OSMOCON, specifics=specifics)
osmocon_obj = osmocon.Osmocon(self, conf=conf)
self.register_for_cleanup(osmocon_obj)
return osmocon_obj
def iperf3srv(self, ip_address=None):
if ip_address is None:
ip_address = self.ip_address()
iperf3srv_obj = iperf3.IPerf3Server(self, ip_address)
return iperf3srv_obj
def msisdn(self):
msisdn = self.suite_run.resource_pool().next_msisdn(self)
self.log('using MSISDN', msisdn)
return msisdn
def lac(self):
lac = self.suite_run.resource_pool().next_lac(self)
self.log('using LAC', lac)
return lac
def rac(self):
rac = self.suite_run.resource_pool().next_rac(self)
self.log('using RAC', rac)
return rac
def cellid(self):
cellid = self.suite_run.resource_pool().next_cellid(self)
self.log('using CellId', cellid)
return cellid
def bvci(self):
bvci = self.suite_run.resource_pool().next_bvci(self)
self.log('using BVCI', bvci)
return bvci
def setup(suite_run, _test):
from .core import process as process_module
from .core.event_loop import MainLoop
from .obj.sms import Sms as Sms_class
from . import suite as suite_module
global trial, suite, test, resources, log, dbg, err, wait, wait_no_raise, sleep, poll, prompt, Timeout, Sms, process
global trial, suite, test, resources, log, dbg, err, wait, wait_no_raise, sleep, poll, prompt, Sms, process
trial = suite_run.trial
suite = suite_run
test = _test
resources = suite_run.reserved_resources
resources = suite_run.reserved_resources # TODO: remove this global, only used in selftest
log = test.log
dbg = test.dbg
err = test.err
@ -55,9 +333,10 @@ def setup(suite_run, _test):
wait_no_raise = lambda *args, **kwargs: MainLoop.wait_no_raise(suite_run, *args, **kwargs)
sleep = lambda *args, **kwargs: MainLoop.sleep(suite_run, *args, **kwargs)
poll = MainLoop.poll
prompt = suite_run.prompt
Timeout = suite_module.Timeout
Sms = Sms_class
process = process_module
suite = TestEnv(suite_run, _test) # stored in "suite" for backward compatibility
prompt = suite.prompt
return suite
# vim: expandtab tabstop=4 shiftwidth=4

View File

@ -22,7 +22,10 @@ import time
import shutil
import tarfile
from .core import log, util, report
from .core import log
from .core import util
from .core import report
from . import suite
FILE_MARK_TAKEN = 'taken'