test: make it possible to use pytest-style test fixtures

Currently all binaries must be available or no tests will be executed.
This is inconvenient if you just want to test a single binary (e.g.
text2pcap) without having to build epan. The problem is essentially that
tests lack dependency annotations.

To solve this problem, add the required dependencies as parameters to
each test (so-called 'fixtures' in pytest). Skip a test if a binary
(such as tshark) is unavailable. As a demonstration, suite_dissection.py
is converted. Over time, tests should no longer depend on config.py due
to explicit dependencies fixtures (listed in fixtures_ws.py).

Since the unittest module does not support such dependency injections,
create a small glue for use with pytest and an (incomplete) emulation
layer for use with test.py.

Tested with pytest 3.8.2 + Python 3.7.0 and pytest 3.0.3 + Python 3.4.3.
Python 2.7 is not supported and will fail. Test commands:

    ~/wireshark/test/test.py -p ~/build/run
    WS_BIN_PATH=~/build/run pytest ~/wireshark/test -ra

Change-Id: I6dc8c28f5c8b7bbc8f4c04838e9bf085cd22eb0b
Ping-Bug: 14949
Reviewed-on: https://code.wireshark.org/review/30220
Tested-by: Petri Dish Buildbot
Petri-Dish: Peter Wu <peter@lekensteyn.nl>
Reviewed-by: Anders Broman <a.broman58@gmail.com>
This commit is contained in:
Peter Wu 2018-10-15 16:07:30 +02:00 committed by Anders Broman
parent e0ac913300
commit 54d7e96a72
7 changed files with 562 additions and 35 deletions

View File

@ -211,18 +211,14 @@ def setUpTestEnvironment():
'ikev2_decryption_table',
]
for uat in uat_files:
setUpUatFile(uat)
setUpUatFile(conf_path, uat)
# Set up our environment
test_env = baseEnv(home=home_path)
test_env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = 'True'
test_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True'
def setUpUatFile(conf_file):
global home_path
global conf_path
if home_path is None or conf_path is None:
setUpTestEnvironment()
def setUpUatFile(conf_path, conf_file):
template = os.path.join(os.path.dirname(__file__), 'config', conf_file) + '.tmpl'
with open(template, 'r') as tplt_fd:
tplt_contents = tplt_fd.read()

View File

@ -10,9 +10,9 @@
import os
import sys
import fixtures
import config
# XXX remove globals in config and create py.test-specific fixtures
try:
_program_path = os.environ['WS_BIN_PATH']
@ -32,3 +32,7 @@ def pytest_collection_modifyitems(items):
if name not in suites:
suites.append(name)
config.all_groups = list(sorted(suites))
# Must enable pytest before importing fixtures_ws.
fixtures.enable_pytest()
from fixtures_ws import *

337
test/fixtures.py Normal file
View File

@ -0,0 +1,337 @@
#
# -*- coding: utf-8 -*-
# Extends unittest with support for pytest-style fixtures.
#
# Copyright (c) 2018 Peter Wu <peter@lekensteyn.nl>
#
# SPDX-License-Identifier: (GPL-2.0-or-later OR MIT)
#
import functools
import inspect
import sys
import unittest
_use_native_pytest = False
def enable_pytest():
global _use_native_pytest, pytest
assert not _fallback
import pytest
_use_native_pytest = True
def fixture(scope="function", params=None, autouse=False, ids=None, name=None):
"""
When running under pytest, this is the same as the pytest.fixture decorator.
See https://docs.pytest.org/en/latest/reference.html#pytest-fixture
"""
if _use_native_pytest:
# XXX sorting of fixtures based on scope does not work, see
# https://github.com/pytest-dev/pytest/issues/4143#issuecomment-431794076
# When ran under pytest, use native functionality.
return pytest.fixture(scope, params, autouse, ids, name)
init_fallback_fixtures_once()
return _fallback.fixture(scope, params, autouse, ids, name)
def _fixture_wrapper(test_fn, params):
@functools.wraps(test_fn)
def wrapped(self):
if not _use_native_pytest:
self._fixture_request.function = getattr(self, test_fn.__name__)
self._fixture_request.fillfixtures(params)
fixtures = [self._fixture_request.getfixturevalue(n) for n in params]
test_fn(self, *fixtures)
return wrapped
def uses_fixtures(cls):
"""Enables use of fixtures within test methods of unittest.TestCase."""
assert issubclass(cls, unittest.TestCase)
for name in dir(cls):
func = getattr(cls, name)
if not name.startswith('test') or not callable(func):
continue
params = inspect.getfullargspec(func).args[1:]
# Unconditionally overwrite methods in case usefixtures marks exist.
setattr(cls, name, _fixture_wrapper(func, params))
if _use_native_pytest:
# Make request object to _fixture_wrapper
@pytest.fixture(autouse=True)
def __inject_request(self, request):
self._fixture_request = request
cls.__inject_request = __inject_request
else:
_patch_unittest_testcase_class(cls)
return cls
def mark_usefixtures(*args):
"""Add the given fixtures to every test method."""
if _use_native_pytest:
return pytest.mark.usefixtures(*args)
def wrapper(cls):
cls._fixtures_prepend = list(args)
return cls
return wrapper
# Begin fallback functionality when pytest is not available.
# Supported:
# - session-scoped fixtures (for cmd_tshark)
# - function-scoped fixtures (for tmpfile)
# - teardown (via yield keyword in fixture)
# - sorting of scopes (session before function)
# - fixtures that depend on other fixtures (requires sorting)
# - marking classes with @pytest.mark.usefixtures("fixture")
# Not supported (yet) due to lack of need for it:
# - autouse fixtures
# - parameterized fixtures (@pytest.fixture(params=...))
# - class-scoped fixtures
# - (overriding) fixtures on various levels (e.g. conftest, module, class)
class _FixtureSpec(object):
def __init__(self, name, scope, func):
self.name = name
self.scope = scope
self.func = func
self.params = inspect.getfullargspec(func).args
if inspect.ismethod(self.params):
self.params = self.params[1:] # skip self
def __repr__(self):
return '<_FixtureSpec name=%s scope=%s params=%r>' % \
(self.name, self.scope, self.params)
class _FixturesManager(object):
'''Records collected fixtures when pytest is unavailable.'''
fixtures = {}
# supported scopes, in execution order.
SCOPES = ('session', 'function')
def _add_fixture(self, scope, autouse, name, func):
name = name or func.__name__
if name in self.fixtures:
raise NotImplementedError('overriding fixtures is not supported')
self.fixtures[name] = _FixtureSpec(name, scope, func)
return func
def fixture(self, scope, params, autouse, ids, name):
if params:
raise NotImplementedError('params is not supported')
if ids:
raise NotImplementedError('ids is not supported')
if autouse:
raise NotImplementedError('autouse is not supported yet')
if callable(scope):
# used as decorator, pass through the original function
self._add_fixture('function', autouse, name, scope)
return scope
assert scope in self.SCOPES, 'unsupported scope'
# invoked with arguments, should return a decorator
return lambda func: self._add_fixture(scope, autouse, name, func)
def lookup(self, name):
return self.fixtures.get(name)
def resolve_fixtures(self, fixtures):
'''Find all dependencies for the requested list of fixtures.'''
unresolved = fixtures.copy()
resolved_keys, resolved = [], []
while unresolved:
param = unresolved.pop(0)
if param in resolved:
continue
spec = self.lookup(param)
if not spec:
if param == 'request':
continue
raise RuntimeError("Fixture '%s' not found" % (param,))
unresolved += spec.params
resolved_keys.append(param)
resolved.append(spec)
# Return fixtures, sorted by their scope
resolved.sort(key=lambda spec: self.SCOPES.index(spec.scope))
return resolved
class _ExecutionScope(object):
'''Store execution/teardown state for a scope.'''
def __init__(self, scope, parent):
self.scope = scope
self.parent = parent
self.cache = {}
self.finalizers = []
def _find_scope(self, scope):
context = self
while context.scope != scope:
context = context.parent
return context
def execute(self, spec, test_fn):
'''Execute a fixture and cache the result.'''
context = self._find_scope(spec.scope)
if spec.name in context.cache:
return
try:
value, cleanup = self._execute_one(spec, test_fn)
exc = None
except Exception:
value, cleanup, exc = None, None, sys.exc_info()[1]
context.cache[spec.name] = value, exc
if cleanup:
context.finalizers.append(cleanup)
if exc:
raise exc
def cached_result(self, spec):
'''Obtain the cached result for a previously executed fixture.'''
value, exc = self._find_scope(spec.scope).cache[spec.name]
if exc:
raise exc
return value
def _execute_one(self, spec, test_fn):
# A fixture can only execute in the same or earlier scopes
context_scope_index = _FixturesManager.SCOPES.index(self.scope)
fixture_scope_index = _FixturesManager.SCOPES.index(spec.scope)
assert fixture_scope_index <= context_scope_index
if spec.params:
# Do not invoke destroy, it is taken care of by the main request.
subrequest = _FixtureRequest(self)
subrequest.function = test_fn
subrequest.fillfixtures(spec.params)
fixtures = (subrequest.getfixturevalue(n) for n in spec.params)
value = spec.func(*fixtures) # Execute fixture
else:
value = spec.func() # Execute fixture
if not inspect.isgenerator(value):
return value, None
@functools.wraps(value)
def cleanup():
try:
next(value)
except StopIteration:
pass
else:
raise RuntimeError('%s yielded more than once!' % (spec.name,))
return next(value), cleanup
def destroy(self):
exceptions = []
for cleanup in self.finalizers:
try:
cleanup()
except:
exceptions.append(sys.exc_info()[1])
self.cache.clear()
self.finalizers.clear()
if exceptions:
raise exceptions[0]
class _FixtureRequest(object):
'''
Holds state during a single test execution. See
https://docs.pytest.org/en/latest/reference.html#request
'''
def __init__(self, context):
self._context = context
self._fixtures_prepend = [] # fixtures added via usefixtures
# XXX is there any need for .module or .cls?
self.function = None # test function, set before execution.
def fillfixtures(self, params):
params = self._fixtures_prepend + params
specs = _fallback.resolve_fixtures(params)
for spec in specs:
self._context.execute(spec, self.function)
def getfixturevalue(self, argname):
spec = _fallback.lookup(argname)
if not spec:
assert argname == 'request'
return self
return self._context.cached_result(spec)
def destroy(self):
self._context.destroy()
def addfinalizer(self, finalizer):
self._context.finalizers.append(finalizer)
@property
def instance(self):
return self.function.__self__
def _patch_unittest_testcase_class(cls):
'''
Patch the setUp and tearDown methods of the unittest.TestCase such that the
fixtures are properly setup and destroyed.
'''
def setUp(self):
assert _session_context, 'must call create_session() first!'
function_context = _ExecutionScope('function', _session_context)
req = _FixtureRequest(function_context)
req._fixtures_prepend = getattr(self, '_fixtures_prepend', [])
self._fixture_request = req
self._orig_setUp()
def tearDown(self):
try:
self._orig_tearDown()
finally:
self._fixture_request.destroy()
# Only the leaf test case class should be decorated!
assert not hasattr(cls, '_orig_setUp')
assert not hasattr(cls, '_orig_tearDown')
cls._orig_setUp, cls.setUp = cls.setUp, setUp
cls._orig_tearDown, cls.tearDown = cls.tearDown, tearDown
_fallback = None
_session_context = None
def init_fallback_fixtures_once():
global _fallback
assert not _use_native_pytest
if _fallback:
return
_fallback = _FixturesManager()
# Register standard fixtures here as needed
def create_session():
global _session_context
assert not _use_native_pytest
_session_context = _ExecutionScope('session', None)
def destroy_session():
global _session_context
assert not _use_native_pytest
_session_context = None
def skip(msg):
'''Skip the executing test with the given message.'''
if _use_native_pytest:
pytest.skip(msg)
else:
raise unittest.SkipTest(msg)

174
test/fixtures_ws.py Normal file
View File

@ -0,0 +1,174 @@
#
# -*- coding: utf-8 -*-
# Wireshark tests
#
# Copyright (c) 2018 Peter Wu <peter@lekensteyn.nl>
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
'''Fixtures that are specific to Wireshark.'''
import logging
import os
import re
import subprocess
import sys
import tempfile
import types
import fixtures
import config
import subprocesstest
@fixtures.fixture(scope='session')
def program_path():
# XXX stop using config
return config.program_path
@fixtures.fixture(scope='session')
def program(program_path):
def resolver(name):
dotexe = ''
if sys.platform.startswith('win32'):
dotexe = '.exe'
path = os.path.normpath(os.path.join(program_path, name + dotexe))
if not os.access(path, os.X_OK):
fixtures.skip('Program %s is not available' % (name,))
return path
return resolver
@fixtures.fixture(scope='session')
def cmd_capinfos(program):
return program('capinfos')
@fixtures.fixture(scope='session')
def cmd_dumpcap(program):
return program('dumpcap')
@fixtures.fixture(scope='session')
def cmd_mergecap(program):
return program('mergecap')
@fixtures.fixture(scope='session')
def cmd_rawshark(program):
return program('rawshark')
@fixtures.fixture(scope='session')
def cmd_tshark(program):
return program('tshark')
@fixtures.fixture(scope='session')
def cmd_text2pcap(program):
return program('text2pcap')
@fixtures.fixture(scope='session')
def cmd_wireshark(program):
return program('wireshark')
@fixtures.fixture(scope='session')
def features(cmd_tshark):
'''Returns an object describing available features in tshark.'''
try:
# XXX stop using config
tshark_v = subprocess.check_output(
(cmd_tshark, '--version'),
stderr=subprocess.PIPE,
universal_newlines=True,
env=config.baseEnv()
)
tshark_v = re.sub(r'\s+', ' ', tshark_v)
except subprocess.CalledProcessError as ex:
logging.warning('Failed to detect tshark features: %s', ex)
tshark_v = ''
gcry_m = re.search(r'with +Gcrypt +([0-9]+\.[0-9]+)', tshark_v)
return types.SimpleNamespace(
have_lua='with Lua' in tshark_v,
have_nghttp2='with nghttp2' in tshark_v,
have_kerberos='with MIT Kerberos' in tshark_v or 'with Heimdal Kerberos' in tshark_v,
have_libgcrypt16=gcry_m and float(gcry_m.group(1)) >= 1.6,
have_libgcrypt17=gcry_m and float(gcry_m.group(1)) >= 1.7,
)
@fixtures.fixture(scope='session')
def dirs():
'''Returns fixed directories containing test input.'''
this_dir = os.path.dirname(__file__)
return types.SimpleNamespace(
baseline_dir=os.path.join(this_dir, 'baseline'),
capture_dir=os.path.join(this_dir, 'captures'),
config_dir=os.path.join(this_dir, 'config'),
key_dir=os.path.join(this_dir, 'keys'),
lua_dir=os.path.join(this_dir, 'lua'),
tools_dir=os.path.join(this_dir, '..', 'tools'),
)
@fixtures.fixture
def home_path():
'''Per-test home directory, removed when finished.'''
with tempfile.TemporaryDirectory(prefix='wireshark-tests-home-') as dirname:
yield dirname
@fixtures.fixture
def conf_path(home_path):
'''Path to the Wireshark configuration directory.'''
if sys.platform.startswith('win32'):
conf_path = os.path.join(home_path, 'Wireshark')
else:
conf_path = os.path.join(home_path, '.config', 'wireshark')
os.makedirs(conf_path)
return conf_path
@fixtures.fixture
def base_env(home_path):
"""A modified environment to ensure reproducible tests. Tests can modify
this environment as they see fit."""
env = os.environ.copy()
env['TZ'] = 'UTC'
home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME'
env[home_env] = home_path
return env
@fixtures.fixture
def test_env(base_env, conf_path, request):
'''A process environment with a populated configuration directory.'''
# Populate our UAT files
uat_files = [
'80211_keys',
'dtlsdecrypttablefile',
'esp_sa',
'ssl_keys',
'c1222_decryption_table',
'ikev1_decryption_table',
'ikev2_decryption_table',
]
for uat in uat_files:
# XXX stop using config
config.setUpUatFile(conf_path, uat)
env = base_env
env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = '1'
env['WIRESHARK_QUIT_AFTER_CAPTURE'] = '1'
# Remove this if test instances no longer inherit from SubprocessTestCase?
assert isinstance(request.instance, subprocesstest.SubprocessTestCase)
# Inject the test environment as default if it was not overridden.
request.instance.injected_test_env = env
return env
# XXX capture: capture_interface
# XXX nameres: setUpHostFiles() / custom_profile_path / custom_profile_name

View File

@ -263,6 +263,14 @@ class SubprocessTestCase(unittest.TestCase):
'''Start a process in the background. Returns a subprocess.Popen object.
You typically wait for it using waitProcess() or assertWaitProcess().'''
if env is None:
# Apply default test environment if no override is provided.
env = getattr(self, 'injected_test_env', None)
# Not all tests need test_env, but those that use runProcess or
# startProcess must either pass an explicit environment or load the
# fixture (via a test method parameter or class decorator).
assert not (env is None and hasattr(self, '_fixture_request')), \
"Decorate class with @fixtures.mark_usefixtures('test_env')"
if env is None:
# Avoid using the test user's real environment by default.
env = config.test_env

View File

@ -9,36 +9,38 @@
#
'''Dissection tests'''
import config
import os.path
import subprocesstest
import unittest
import fixtures
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_dissect_http2(subprocesstest.SubprocessTestCase):
def test_http2_data_reassembly(self):
def test_http2_data_reassembly(self, cmd_tshark, features, dirs):
'''HTTP2 data reassembly'''
if not config.have_nghttp2:
if not features.have_nghttp2:
self.skipTest('Requires nghttp2.')
capture_file = os.path.join(config.capture_dir, 'http2-data-reassembly.pcap')
key_file = os.path.join(config.key_dir, 'http2-data-reassembly.keys')
self.runProcess((config.cmd_tshark,
capture_file = os.path.join(dirs.capture_dir, 'http2-data-reassembly.pcap')
key_file = os.path.join(dirs.key_dir, 'http2-data-reassembly.keys')
self.runProcess((cmd_tshark,
'-r', capture_file,
'-o', 'tls.keylog_file: {}'.format(key_file),
'-d', 'tcp.port==8443,tls',
'-Y', 'http2.data.data matches "PNG" && http2.data.data matches "END"',
),
env=config.test_env)
))
self.assertTrue(self.grepOutput('DATA'))
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_dissect_tcp(subprocesstest.SubprocessTestCase):
def check_tcp_out_of_order(self, extraArgs=[]):
capture_file = os.path.join(config.capture_dir, 'http-ooo.pcap')
self.runProcess([config.cmd_tshark,
def check_tcp_out_of_order(self, cmd_tshark, dirs, extraArgs=[]):
capture_file = os.path.join(dirs.capture_dir, 'http-ooo.pcap')
self.runProcess([cmd_tshark,
'-r', capture_file,
'-otcp.reassemble_out_of_order:TRUE',
'-Y', 'http',
] + extraArgs,
env=config.test_env)
] + extraArgs)
self.assertEqual(self.countOutput('HTTP'), 5)
# TODO PDU /1 (segments in frames 1, 2, 4) should be reassembled in
# frame 4, but it is currently done in frame 6 because the current
@ -50,24 +52,23 @@ class case_dissect_tcp(subprocesstest.SubprocessTestCase):
self.assertTrue(self.grepOutput(r'^\s*11\s.*PUT /4 HTTP/1.1'))
self.assertTrue(self.grepOutput(r'^\s*15\s.*PUT /5 HTTP/1.1'))
def test_tcp_out_of_order_onepass(self):
self.check_tcp_out_of_order()
def test_tcp_out_of_order_onepass(self, cmd_tshark, dirs):
self.check_tcp_out_of_order(cmd_tshark, dirs)
@unittest.skip("MSP splitting is not implemented yet")
def test_tcp_out_of_order_twopass(self):
self.check_tcp_out_of_order(extraArgs=['-2'])
def test_tcp_out_of_order_twopass(self, cmd_tshark, dirs):
self.check_tcp_out_of_order(cmd_tshark, dirs, extraArgs=['-2'])
def test_tcp_out_of_order_twopass_with_bug(self):
def test_tcp_out_of_order_twopass_with_bug(self, cmd_tshark, dirs):
# TODO fix the issue below, remove this and enable
# "test_tcp_out_of_order_twopass"
capture_file = os.path.join(config.capture_dir, 'http-ooo.pcap')
self.runProcess((config.cmd_tshark,
capture_file = os.path.join(dirs.capture_dir, 'http-ooo.pcap')
self.runProcess((cmd_tshark,
'-r', capture_file,
'-otcp.reassemble_out_of_order:TRUE',
'-Y', 'http',
'-2',
),
env=config.test_env)
))
self.assertEqual(self.countOutput('HTTP'), 3)
self.assertTrue(self.grepOutput(r'^\s*7\s.*PUT /1 HTTP/1.1'))
self.assertTrue(self.grepOutput(r'^\s*7\s.*GET /2 HTTP/1.1'))
@ -80,13 +81,12 @@ class case_dissect_tcp(subprocesstest.SubprocessTestCase):
self.assertTrue(self.grepOutput(r'^\s*11\s.*PUT /4 HTTP/1.1'))
self.assertTrue(self.grepOutput(r'^\s*15\s.*PUT /5 HTTP/1.1'))
def test_tcp_out_of_order_data_after_syn(self):
def test_tcp_out_of_order_data_after_syn(self, cmd_tshark, dirs):
'''Test when the first non-empty segment is OoO.'''
capture_file = os.path.join(config.capture_dir, 'dns-ooo.pcap')
proc = self.runProcess((config.cmd_tshark,
capture_file = os.path.join(dirs.capture_dir, 'dns-ooo.pcap')
proc = self.runProcess((cmd_tshark,
'-r', capture_file,
'-otcp.reassemble_out_of_order:TRUE',
'-Y', 'dns', '-Tfields', '-edns.qry.name',
),
env=config.test_env)
))
self.assertEqual(proc.stdout_str.strip(), 'example.com')

View File

@ -21,6 +21,8 @@ import config
import os.path
import sys
import unittest
# Required to make fixtures available to tests!
import fixtures_ws
def find_test_ids(suite, all_ids):
if hasattr(suite, '__iter__'):
@ -139,7 +141,13 @@ def main():
run_suite = unittest.defaultTestLoader.loadTestsFromNames(run_ids)
runner = unittest.TextTestRunner(verbosity=args.verbose)
test_result = runner.run(run_suite)
# for unittest compatibility (not needed with pytest)
fixtures_ws.fixtures.create_session()
try:
test_result = runner.run(run_suite)
finally:
# for unittest compatibility (not needed with pytest)
fixtures_ws.fixtures.destroy_session()
dump_failed_output(run_suite)