Tests: Require pytest support and remove compatibility layer

Remove the pytest compatibility layer and require the real thing.

Fix running tests with pytest and Python 3.11+. Pytest strongly
favors using fixtures instead of setup/teardown methods so
use that. This fixes the test suite with pytest and Python 3.11
and has the added benefit of removing the dependency on a private
unittest property.

We remove the dedicated log file code in SubprocessTestCase and just
write to standard out. This presumes to leverage the pytest logging
features, such as writing to a log file. To make the system more useful
we should probably rely on logging calls instead of writing to
stdout.

The teardown log file cleanup logic and filename_from_id() method
are replaced with pytest fixtures and native temporary path support.
They are cleaner to use and do not require messy teadown logic. The
temporary files are created in the system temporary directory. By
default the last three runs are kept.

More work is needed to complete remove the unittest module dependency.

Fixes #18740.
This commit is contained in:
João Valverde 2023-05-08 07:32:58 +01:00 committed by Gerald Combs
parent 99f059c48b
commit 3128269aa0
17 changed files with 150 additions and 783 deletions

View File

@ -4064,52 +4064,6 @@ endif()
# Test suites
enable_testing()
# We could try to build this list dynamically, but given that we tend to
# go years between adding suites just run
# test/test.py --list-groups | sort
# and paste the output here.
set(_test_group_list
suite_capture
suite_clopts
suite_decryption
suite_dfilter.group_bytes
suite_dfilter.group_double
suite_dfilter.group_ether
suite_dfilter.group_function
suite_dfilter.group_integer
suite_dfilter.group_ipv4
suite_dfilter.group_ipv6
suite_dfilter.group_membership
suite_dfilter.group_scanner
suite_dfilter.group_slice
suite_dfilter.group_string
suite_dfilter.group_syntax
suite_dfilter.group_time
suite_dfilter.group_tvb
suite_dissection
suite_dissectors.group_asterix
suite_extcaps
suite_fileformats
suite_follow
suite_follow_dccp
suite_follow_multistream
suite_io
suite_mergecap
suite_nameres
suite_netperfmeter
suite_outputformats
suite_release
suite_sharkd
suite_text2pcap
suite_unittests
suite_wslua
)
# We don't currently handle spaces in arguments. On Windows this
# means that you will probably have to pass in an interface index
# instead of a name.
set(TEST_EXTRA_ARGS "" CACHE STRING "Extra arguments to pass to test/test.py")
separate_arguments(TEST_EXTRA_ARGS)
add_test(build_unittests
"${CMAKE_COMMAND}"
@ -4119,20 +4073,12 @@ add_test(build_unittests
)
set_tests_properties(build_unittests PROPERTIES FIXTURES_SETUP unittests)
foreach(_group_name ${_test_group_list})
add_test(
NAME ${_group_name}
COMMAND ${CMAKE_COMMAND} -E env PYTHONIOENCODING=UTF-8
${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/test/test.py
--verbose
--program-path $<TARGET_FILE_DIR:wmem_test>
${TEST_EXTRA_ARGS}
${_group_name}
)
set_tests_properties(${_group_name} PROPERTIES TIMEOUT 600)
endforeach()
set_tests_properties(suite_unittests PROPERTIES FIXTURES_REQUIRED unittests)
add_test(
NAME all
COMMAND ${CMAKE_COMMAND} -E env PYTHONIOENCODING=UTF-8
${PYTHON_EXECUTABLE} -m pytest
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
)
# Make it possible to run pytest without passing the full path as argument.
if(NOT CMAKE_SOURCE_DIR STREQUAL CMAKE_BINARY_DIR)

View File

@ -32,11 +32,8 @@ capture tests, pass the `--disable-capture` option.
List available tests with `pytest --collectonly`. Enable verbose output
with `pytest --verbose`. For more details, see <<ChTestsRunPytest>>.
If for whatever reason `pytest` is too old or unavailable, you could use
a more limited test runner, `test/test.py`. Use `test/test.py --help` to
see all options. For more details, see <<ChTestsRun>>.
CMake currently runs `test/test.py` when the “test” target is built.
CMake runs pytest when the “test” target is built but the user experience
is poor. It is strongly recommended to invoke pytest directly instead.
[#ChTestsStructure]
=== Test suite structure
@ -66,7 +63,7 @@ be skipped while other tests can still run to completion.
[#TestsLayout]
==== Suites, Cases, and Tests
The `test/test.py` script uses Python's “unittest” module. Our tests are
The test suite uses Python's “unittest” module. Our tests are
patterned after it, and individual tests are organized according to
suites, cases, and individual tests. Suites correspond to Python modules
that match the pattern “suite_*.py”. Cases correspond to one or more
@ -108,58 +105,6 @@ actually request fixture dependencies.
[#ChTestsRun]
=== Listing And Running Tests
Tests can be run via the `test/test.py` Python script. To run all tests,
either run `test/test.py` in the directory that contains the Wireshark
executables (`wireshark`, `tshark`, etc.), or pass the executable
path via the `-p` flag:
[source,sh]
----
$ python3 test/test.py -p /path/to/wireshark-build/run
----
You can list tests by passing one or more complete or partial names to
`test/test.py`. The `-l` flag lists tests. By default all tests are shown.
[source,sh]
----
# List all tests
$ python3 test/test.py -l
$ python3 test/test.py -l all
$ python3 test/test.py --list
$ python3 test/test.py --list all
# List only tests containing "dumpcap"
$ python3 test/test.py -l dumpcap
# List all suites
$ python3 test/test.py --list-suites
# List all suites and cases
$ python3 test/test.py --list-cases
----
If one of the listing flags is not present, tests are run. If no names or `all` is supplied,
all tests are run. Otherwise tests that match are run.
[source,sh]
----
# Run all tests
$ python3 test/test.py
$ python3 test/test.py all
# Only run tests containing "dumpcap"
$ python3 test/test.py dumpcap
# Run the "clopts" suite
$ python3 test/test.py suite_clopts
----
Run `python3 test/test.py --help` for all available options.
[#ChTestsRunPytest]
=== Listing And Running Tests (pytest)
Tests can also be run with https://pytest.org/[pytest]. Advantages include finer
test selection, full parallelism, nicer test execution summaries, better output
in case of failures (containing the contents of variables) and the ability to

View File

@ -7,8 +7,6 @@
#
'''pytest configuration'''
import re
import fixtures
def pytest_addoption(parser):
parser.addoption('--disable-capture', action='store_true',
@ -19,27 +17,5 @@ def pytest_addoption(parser):
help='Skip tests that lack programs from this list instead of failing'
' them. Use "all" to ignore all missing programs.')
_all_test_groups = None
# this is set only to please case_unittests.test_unit_ctest_coverage
def pytest_collection_modifyitems(items):
'''Find all test groups.'''
global _all_test_groups
suites = []
for item in items:
name = item.nodeid.split("::")[0].replace(".py", "")
# When executed from the rootdir (e.g. "pytest test"), be sure to strip
# all preceding components ("test/suite_io" -> "suite_io").
name = re.sub(r'^.*/suite_', 'suite_', name)
name = name.replace("/", ".")
if name not in suites:
suites.append(name)
_all_test_groups = sorted(suites)
# Must enable pytest before importing fixtures_ws.
fixtures.enable_pytest()
from fixtures_ws import *
@fixtures.fixture(scope='session')
def all_test_groups():
return _all_test_groups

View File

@ -12,14 +12,8 @@ import inspect
import sys
import unittest
_use_native_pytest = False
def enable_pytest():
global _use_native_pytest, pytest
assert not _fallback
import pytest
_use_native_pytest = True
import pytest
_use_native_pytest = True
def fixture(callable_or_scope=None, *, scope="function", params=None,
@ -89,289 +83,6 @@ def mark_usefixtures(*args):
return cls
return wrapper
# Begin fallback functionality when pytest is not available.
# Supported:
# - session-scoped fixtures (for cmd_tshark)
# - function-scoped fixtures (for tmpfile)
# - teardown (via yield keyword in fixture)
# - sorting of scopes (session before function)
# - fixtures that depend on other fixtures (requires sorting)
# - marking classes with @pytest.mark.usefixtures("fixture")
# Not supported (yet) due to lack of need for it:
# - autouse fixtures
# - parameterized fixtures (@pytest.fixture(params=...))
# - class-scoped fixtures
# - (overriding) fixtures on various levels (e.g. conftest, module, class)
class _FixtureSpec(object):
def __init__(self, name, scope, func):
self.name = name
self.scope = scope
self.func = func
self.params = inspect.getfullargspec(func).args
if inspect.ismethod(self.params):
self.params = self.params[1:] # skip self
def __repr__(self):
return '<_FixtureSpec name=%s scope=%s params=%r>' % \
(self.name, self.scope, self.params)
class _FixturesManager(object):
'''Records collected fixtures when pytest is unavailable.'''
fixtures = {}
# supported scopes, in execution order.
SCOPES = ('session', 'function')
def _add_fixture(self, scope, autouse, name, func):
name = name or func.__name__
if name in self.fixtures:
raise NotImplementedError('overriding fixtures is not supported')
self.fixtures[name] = _FixtureSpec(name, scope, func)
return func
def fixture(self, scope, params, autouse, ids, name):
if params:
raise NotImplementedError('params is not supported')
if ids:
raise NotImplementedError('ids is not supported')
if autouse:
raise NotImplementedError('autouse is not supported yet')
if callable(scope):
# used as decorator, pass through the original function
self._add_fixture('function', autouse, name, scope)
return scope
assert scope in self.SCOPES, 'unsupported scope'
# invoked with arguments, should return a decorator
return lambda func: self._add_fixture(scope, autouse, name, func)
def lookup(self, name):
return self.fixtures.get(name)
def resolve_fixtures(self, fixtures):
'''Find all dependencies for the requested list of fixtures.'''
unresolved = fixtures.copy()
resolved_keys, resolved = [], []
while unresolved:
param = unresolved.pop(0)
if param in resolved:
continue
spec = self.lookup(param)
if not spec:
if param == 'request':
continue
raise RuntimeError("Fixture '%s' not found" % (param,))
unresolved += spec.params
resolved_keys.append(param)
resolved.append(spec)
# Return fixtures, sorted by their scope
resolved.sort(key=lambda spec: self.SCOPES.index(spec.scope))
return resolved
class _ExecutionScope(object):
'''Store execution/teardown state for a scope.'''
def __init__(self, scope, parent):
self.scope = scope
self.parent = parent
self.cache = {}
self.finalizers = []
def _find_scope(self, scope):
context = self
while context.scope != scope:
context = context.parent
return context
def execute(self, spec, test_fn):
'''Execute a fixture and cache the result.'''
context = self._find_scope(spec.scope)
if spec.name in context.cache:
return
try:
value, cleanup = self._execute_one(spec, test_fn)
exc = None
except Exception:
value, cleanup, exc = None, None, sys.exc_info()[1]
context.cache[spec.name] = value, exc
if cleanup:
context.finalizers.append(cleanup)
if exc:
raise exc
def cached_result(self, spec):
'''Obtain the cached result for a previously executed fixture.'''
entry = self._find_scope(spec.scope).cache.get(spec.name)
if not entry:
return None, False
value, exc = entry
if exc:
raise exc
return value, True
def _execute_one(self, spec, test_fn):
# A fixture can only execute in the same or earlier scopes
context_scope_index = _FixturesManager.SCOPES.index(self.scope)
fixture_scope_index = _FixturesManager.SCOPES.index(spec.scope)
assert fixture_scope_index <= context_scope_index
if spec.params:
# Do not invoke destroy, it is taken care of by the main request.
subrequest = _FixtureRequest(self)
subrequest.function = test_fn
subrequest.fillfixtures(spec.params)
fixtures = (subrequest.getfixturevalue(n) for n in spec.params)
value = spec.func(*fixtures) # Execute fixture
else:
value = spec.func() # Execute fixture
if not inspect.isgenerator(value):
return value, None
@functools.wraps(value)
def cleanup():
try:
next(value)
except StopIteration:
pass
else:
raise RuntimeError('%s yielded more than once!' % (spec.name,))
return next(value), cleanup
def destroy(self):
exceptions = []
for cleanup in self.finalizers:
try:
cleanup()
except Exception:
exceptions.append(sys.exc_info()[1])
self.cache.clear()
self.finalizers.clear()
if exceptions:
raise exceptions[0]
class _FixtureRequest(object):
'''
Holds state during a single test execution. See
https://docs.pytest.org/en/latest/reference.html#request
'''
def __init__(self, context):
self._context = context
self._fixtures_prepend = [] # fixtures added via usefixtures
# XXX is there any need for .module or .cls?
self.function = None # test function, set before execution.
def fillfixtures(self, params):
params = self._fixtures_prepend + params
specs = _fallback.resolve_fixtures(params)
for spec in specs:
self._context.execute(spec, self.function)
def getfixturevalue(self, argname):
spec = _fallback.lookup(argname)
if not spec:
assert argname == 'request'
return self
value, ok = self._context.cached_result(spec)
if not ok:
# If getfixturevalue is called directly from a setUp function, the
# fixture value might not have computed before, so evaluate it now.
# As the test function is not available, use None.
self._context.execute(spec, test_fn=None)
value, ok = self._context.cached_result(spec)
assert ok, 'Failed to execute fixture %s' % (spec,)
return value
def destroy(self):
self._context.destroy()
def addfinalizer(self, finalizer):
self._context.finalizers.append(finalizer)
@property
def instance(self):
return self.function.__self__
@property
def config(self):
'''The pytest config object associated with this request.'''
return _config
def _patch_unittest_testcase_class(cls):
'''
Patch the setUp and tearDown methods of the unittest.TestCase such that the
fixtures are properly setup and destroyed.
'''
def setUp(self):
assert _session_context, 'must call create_session() first!'
function_context = _ExecutionScope('function', _session_context)
req = _FixtureRequest(function_context)
req._fixtures_prepend = getattr(self, '_fixtures_prepend', [])
self._fixture_request = req
self._orig_setUp()
def tearDown(self):
try:
self._orig_tearDown()
finally:
self._fixture_request.destroy()
# Only the leaf test case class should be decorated!
assert not hasattr(cls, '_orig_setUp')
assert not hasattr(cls, '_orig_tearDown')
cls._orig_setUp, cls.setUp = cls.setUp, setUp
cls._orig_tearDown, cls.tearDown = cls.tearDown, tearDown
class _Config(object):
def __init__(self, args):
assert isinstance(args, argparse.Namespace)
self.args = args
def getoption(self, name, default):
'''Partial emulation for pytest Config.getoption.'''
name = name.lstrip('-').replace('-', '_')
return getattr(self.args, name, default)
_fallback = None
_session_context = None
_config = None
def init_fallback_fixtures_once():
global _fallback
assert not _use_native_pytest
if _fallback:
return
_fallback = _FixturesManager()
# Register standard fixtures here as needed
def create_session(args=None):
'''Start a test session where args is from argparse.'''
global _session_context, _config
assert not _use_native_pytest
_session_context = _ExecutionScope('session', None)
if args is None:
args = argparse.Namespace()
_config = _Config(args)
def destroy_session():
global _session_context
assert not _use_native_pytest
_session_context = None
def skip(msg):
'''Skip the executing test with the given message.'''
if _use_native_pytest:
pytest.skip(msg)
else:
raise unittest.SkipTest(msg)
pytest.skip(msg)

View File

@ -209,6 +209,12 @@ def capture_file(dirs):
return os.path.join(dirs.capture_dir, filename)
return resolver
@fixtures.fixture
def result_file(tmp_path):
'''Returns the path to a temporary file.'''
def result_file_real(filename):
return str(tmp_path / filename)
return result_file_real
@fixtures.fixture
def home_path():
@ -382,14 +388,14 @@ def make_screenshot():
@fixtures.fixture
def make_screenshot_on_error(request, make_screenshot):
def make_screenshot_on_error(request, make_screenshot, result_file):
'''Writes a screenshot when a process times out.'''
@contextmanager
def make_screenshot_on_error_real():
try:
yield
except subprocess.TimeoutExpired:
filename = request.instance.filename_from_id('screenshot.png')
filename = result_file('screenshot.png')
make_screenshot(filename)
raise
return make_screenshot_on_error_real

View File

@ -142,8 +142,7 @@ class SubprocessTestCase(unittest.TestCase):
self.exit_open_error = 9
self.exit_code = None
self.log_fname = None
self.log_fd = None
self.log_fd = sys.stdout
self.processes = []
self.cleanup_files = []
self.dump_files = []
@ -151,13 +150,6 @@ class SubprocessTestCase(unittest.TestCase):
def log_fd_write_bytes(self, log_data):
self.log_fd.write(log_data)
def filename_from_id(self, filename):
'''Generate a filename prefixed with our test ID.'''
id_filename = self.id() + '.' + filename
if id_filename not in self.cleanup_files:
self.cleanup_files.append(id_filename)
return id_filename
def kill_processes(self):
'''Kill any processes we've opened so far'''
for proc in self.processes:
@ -166,60 +158,12 @@ class SubprocessTestCase(unittest.TestCase):
except Exception:
pass
def setUp(self):
"""
Set up a single test. Opens a log file and add it to the cleanup list.
"""
self.processes = []
self.log_fname = self.filename_from_id('log')
# Our command line utilities generate UTF-8. The log file endcoding
# needs to match that.
# XXX newline='\n' works for now, but we might have to do more work
# to handle line endings in the future.
self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8', newline='\n')
self.cleanup_files.append(self.log_fname)
def _last_test_failed(self):
"""Check for non-skipped tests that resulted in errors."""
# The test outcome is not available via the public unittest API, so
# check a private property, "_outcome", set by unittest.TestCase.run.
# It remains None when running in debug mode (`pytest --pdb`).
# The property is available since Python 3.4 until at least Python 3.7.
if self._outcome:
if hasattr(self._outcome, 'errors'):
# Python 3.4 - 3.10
result = self.defaultTestResult()
self._feedErrorsToResult(result, self._outcome.errors)
else:
# Python 3.11+
result = self._outcome.result
for test_case, exc_info in (result.errors + result.failures):
if exc_info:
return True
# No errors occurred or running in debug mode.
return False
def tearDown(self):
"""
Tears down a single test. Kills stray processes and closes the log file.
On errors, display the log contents. On success, remove temporary files.
"""
self.kill_processes()
self.log_fd.close()
if self._last_test_failed():
self.dump_files.append(self.log_fname)
# Leave some evidence behind.
self.cleanup_files = []
print('\nProcess output for {}:'.format(self.id()))
with io.open(self.log_fname, 'r', encoding='UTF-8', errors='backslashreplace') as log_fd:
for line in log_fd:
sys.stdout.write(line)
for filename in self.cleanup_files:
try:
os.unlink(filename)
except OSError:
pass
self.cleanup_files = []
def getCaptureInfo(self, capinfos_args=None, cap_file=None):
'''Run capinfos on a capture file and log its output.
@ -229,7 +173,7 @@ class SubprocessTestCase(unittest.TestCase):
# XXX convert users to use a new fixture instead of this function.
cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos')
if not cap_file:
cap_file = self.filename_from_id('testout.pcap')
cap_file = self._fixture_request.getfixturevalue('result_file')('testout.pcap')
self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file))
capinfos_cmd = [cmd_capinfos]
if capinfos_args is not None:

View File

@ -83,11 +83,11 @@ def capture_command(*args, shell=False):
@fixtures.fixture
def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator):
def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator, result_file):
start_traffic, cfilter = traffic_generator
def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
stop_traffic = start_traffic()
if to_stdout:
capture_proc = self.runProcess(capture_command(cmd,
@ -122,14 +122,14 @@ def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator):
@fixtures.fixture
def check_capture_fifo(cmd_dumpcap):
def check_capture_fifo(cmd_dumpcap, result_file):
if sys.platform == 'win32':
fixtures.skip('Test requires OS fifo support.')
def check_capture_fifo_real(self, cmd=None):
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
fifo_file = self.filename_from_id('testout.fifo')
testout_file = result_file(testout_pcap)
fifo_file = result_file('testout.fifo')
try:
# If a previous test left its fifo laying around, e.g. from a failure, remove it.
os.unlink(fifo_file)
@ -153,12 +153,12 @@ def check_capture_fifo(cmd_dumpcap):
@fixtures.fixture
def check_capture_stdin(cmd_dumpcap):
def check_capture_stdin(cmd_dumpcap, result_file):
# Capturing always requires dumpcap, hence the dependency on it.
def check_capture_stdin_real(self, cmd=None):
# Similar to suite_io.check_io_4_packets.
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
capture_cmd = capture_command(cmd,
'-i', '-',
@ -180,11 +180,11 @@ def check_capture_stdin(cmd_dumpcap):
@fixtures.fixture
def check_capture_read_filter(capture_interface, traffic_generator):
def check_capture_read_filter(capture_interface, traffic_generator, result_file):
start_traffic, cfilter = traffic_generator
def check_capture_read_filter_real(self, cmd=None):
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
stop_traffic = start_traffic()
capture_proc = self.assertRun(capture_command(cmd,
'-i', capture_interface,
@ -201,12 +201,12 @@ def check_capture_read_filter(capture_interface, traffic_generator):
return check_capture_read_filter_real
@fixtures.fixture
def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator):
def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator, result_file):
start_traffic, cfilter = traffic_generator
def check_capture_snapshot_len_real(self, cmd=None):
self.assertIsNotNone(cmd)
stop_traffic = start_traffic()
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
capture_proc = self.assertRun(capture_command(cmd,
'-i', capture_interface,
'-p',
@ -219,7 +219,7 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator)
self.assertTrue(os.path.isfile(testout_file))
# Use tshark to filter out all packets larger than 68 bytes.
testout2_file = self.filename_from_id('testout2.pcap')
testout2_file = result_file('testout2.pcap')
filter_proc = self.assertRun((cmd_tshark,
'-r', testout_file,
@ -231,10 +231,10 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator)
@fixtures.fixture
def check_dumpcap_autostop_stdin(cmd_dumpcap):
def check_dumpcap_autostop_stdin(cmd_dumpcap, result_file):
def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None):
# Similar to check_capture_stdin.
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
condition='oops:invalid'
@ -307,7 +307,7 @@ def check_dumpcap_ringbuffer_stdin(cmd_dumpcap):
@fixtures.fixture
def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, capture_file):
def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, capture_file, result_file):
if sys.platform == 'win32':
fixtures.skip('Test requires OS fifo support.')
def check_dumpcap_pcapng_sections_real(self, multi_input=False, multi_output=False):
@ -333,7 +333,7 @@ def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, capture_file):
check_vals = [ check_val_d ]
for in_files in in_files_l:
fifo_file = self.filename_from_id('dumpcap_pcapng_sections_{}.fifo'.format(len(fifo_files) + 1))
fifo_file = result_file('dumpcap_pcapng_sections_{}.fifo'.format(len(fifo_files) + 1))
fifo_files.append(fifo_file)
# If a previous test left its fifo laying around, e.g. from a failure, remove it.
try:
@ -350,7 +350,7 @@ def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, capture_file):
check_vals.append(check_val_d.copy())
# check_vals[]['filename'] will be filled in below
else:
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
check_vals[0]['filename'] = testout_file
# Capture commands

View File

@ -48,27 +48,27 @@ class case_dumpcap_options(subprocesstest.SubprocessTestCase):
@fixtures.mark_usefixtures('base_env')
@fixtures.uses_fixtures
class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
def test_dumpcap_invalid_capfilter(self, cmd_dumpcap, capture_interface):
def test_dumpcap_invalid_capfilter(self, cmd_dumpcap, capture_interface, result_file):
'''Invalid capture filter'''
invalid_filter = '__invalid_protocol'
# $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
self.runProcess((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file))
self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, capture_interface):
def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, capture_interface, result_file):
'''Invalid capture interface name'''
invalid_interface = '__invalid_interface'
# $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
self.runProcess((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file))
self.assertTrue(self.grepOutput('There is no device named "__invalid_interface"'))
def test_dumpcap_invalid_interface_index(self, cmd_dumpcap, capture_interface):
def test_dumpcap_invalid_interface_index(self, cmd_dumpcap, capture_interface, result_file):
'''Invalid capture interface index'''
invalid_index = '0'
# $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
self.runProcess((cmd_dumpcap, '-i', invalid_index, '-w', testout_file))
self.assertTrue(self.grepOutput('There is no interface with that adapter index'))
@ -114,27 +114,27 @@ class case_tshark_options(subprocesstest.SubprocessTestCase):
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
def test_tshark_invalid_capfilter(self, cmd_tshark, capture_interface):
def test_tshark_invalid_capfilter(self, cmd_tshark, capture_interface, result_file):
'''Invalid capture filter'''
invalid_filter = '__invalid_protocol'
# $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
self.runProcess((cmd_tshark, '-f', invalid_filter, '-w', testout_file ))
self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
def test_tshark_invalid_interface_name(self, cmd_tshark, capture_interface):
def test_tshark_invalid_interface_name(self, cmd_tshark, capture_interface, result_file):
'''Invalid capture interface name'''
invalid_interface = '__invalid_interface'
# $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
self.runProcess((cmd_tshark, '-i', invalid_interface, '-w', testout_file))
self.assertTrue(self.grepOutput('There is no device named "__invalid_interface"'))
def test_tshark_invalid_interface_index(self, cmd_tshark, capture_interface):
def test_tshark_invalid_interface_index(self, cmd_tshark, capture_interface, result_file):
'''Invalid capture interface index'''
invalid_index = '0'
# $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
self.runProcess((cmd_tshark, '-i', invalid_index, '-w', testout_file))
self.assertTrue(self.grepOutput('There is no interface with that adapter index'))

View File

@ -784,11 +784,11 @@ class case_decrypt_kerberos(subprocesstest.SubprocessTestCase):
self.assertTrue(self.grepOutput('ccda7d48219f73c3b28311c4ba7242b3'))
@fixtures.fixture(scope='session')
def run_wireguard_test(cmd_tshark, capture_file, features):
@fixtures.fixture
def run_wireguard_test(cmd_tshark, capture_file, result_file, features):
def runOne(self, args, keylog=None, pcap_file='wireguard-ping-tcp.pcap'):
if keylog:
keylog_file = self.filename_from_id('wireguard.keys')
keylog_file = result_file('wireguard.keys')
args += ['-owg.keylog_file:%s' % keylog_file]
with open(keylog_file, 'w') as f:
f.write("\n".join(keylog))

View File

@ -26,12 +26,13 @@ class _dissection_validator_real:
unacceptable overhead during execution of the unittests.
'''
def __init__(self, protocol, request, cmd_tshark, cmd_text2pcap):
def __init__(self, protocol, request, cmd_tshark, cmd_text2pcap, result_file):
self.dissection_list = []
self.protocol = protocol
self.cmd_tshark = cmd_tshark
self.cmd_text2pcap = cmd_text2pcap
self.test_case = request.instance
self.result_file = result_file
def add_dissection(self, byte_list, expected_result, line_no=None):
'''Adds a byte bundle and an expected result to the set of byte
@ -61,8 +62,8 @@ class _dissection_validator_real:
'''Processes and verifies all added byte bundles and their expected
results. At the end of processing the current set is emptied.'''
text_file = self.test_case.filename_from_id('txt')
pcap_file = self.test_case.filename_from_id('pcap')
text_file = self.result_file('txt')
pcap_file = self.result_file('pcap')
# create our text file of hex encoded messages
with open(text_file, 'w') as f:
@ -104,14 +105,15 @@ class _dissection_validator_real:
@fixtures.fixture
def dissection_validator(request, cmd_tshark, cmd_text2pcap):
def dissection_validator(request, cmd_tshark, cmd_text2pcap, result_file):
def generate_validator(protocol):
retval = _dissection_validator_real(
protocol,
request,
cmd_tshark,
cmd_text2pcap)
cmd_text2pcap,
result_file)
return retval
return generate_validator

View File

@ -138,11 +138,11 @@ def check_pcapng_dsb_fields(request, cmd_tshark):
@fixtures.mark_usefixtures('base_env')
@fixtures.uses_fixtures
class case_fileformat_pcapng_dsb(subprocesstest.SubprocessTestCase):
def test_pcapng_dsb_1(self, cmd_tshark, dirs, capture_file, check_pcapng_dsb_fields):
def test_pcapng_dsb_1(self, cmd_tshark, dirs, capture_file, result_file, check_pcapng_dsb_fields):
'''Check that DSBs are preserved while rewriting files.'''
dsb_keys1 = os.path.join(dirs.key_dir, 'tls12-dsb-1.keys')
dsb_keys2 = os.path.join(dirs.key_dir, 'tls12-dsb-2.keys')
outfile = self.filename_from_id('tls12-dsb-same.pcapng')
outfile = result_file('tls12-dsb-same.pcapng')
self.assertRun((cmd_tshark,
'-r', capture_file('tls12-dsb.pcapng'),
'-w', outfile,
@ -156,10 +156,10 @@ class case_fileformat_pcapng_dsb(subprocesstest.SubprocessTestCase):
(0x544c534b, len(dsb2_contents), dsb2_contents),
))
def test_pcapng_dsb_2(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields):
def test_pcapng_dsb_2(self, cmd_editcap, dirs, capture_file, result_file, check_pcapng_dsb_fields):
'''Insert a single DSB into a pcapng file.'''
key_file = os.path.join(dirs.key_dir, 'dhe1_keylog.dat')
outfile = self.filename_from_id('dhe1-dsb.pcapng')
outfile = result_file('dhe1-dsb.pcapng')
self.assertRun((cmd_editcap,
'--inject-secrets', 'tls,%s' % key_file,
capture_file('dhe1.pcapng.gz'), outfile
@ -170,11 +170,11 @@ class case_fileformat_pcapng_dsb(subprocesstest.SubprocessTestCase):
(0x544c534b, len(keylog_contents), keylog_contents),
))
def test_pcapng_dsb_3(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields):
def test_pcapng_dsb_3(self, cmd_editcap, dirs, capture_file, result_file, check_pcapng_dsb_fields):
'''Insert two DSBs into a pcapng file.'''
key_file1 = os.path.join(dirs.key_dir, 'dhe1_keylog.dat')
key_file2 = os.path.join(dirs.key_dir, 'http2-data-reassembly.keys')
outfile = self.filename_from_id('dhe1-dsb.pcapng')
outfile = result_file('dhe1-dsb.pcapng')
self.assertRun((cmd_editcap,
'--inject-secrets', 'tls,%s' % key_file1,
'--inject-secrets', 'tls,%s' % key_file2,
@ -189,12 +189,12 @@ class case_fileformat_pcapng_dsb(subprocesstest.SubprocessTestCase):
(0x544c534b, len(keylog2_contents), keylog2_contents),
))
def test_pcapng_dsb_4(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields):
def test_pcapng_dsb_4(self, cmd_editcap, dirs, capture_file, result_file, check_pcapng_dsb_fields):
'''Insert a single DSB into a pcapng file with existing DSBs.'''
dsb_keys1 = os.path.join(dirs.key_dir, 'tls12-dsb-1.keys')
dsb_keys2 = os.path.join(dirs.key_dir, 'tls12-dsb-2.keys')
key_file = os.path.join(dirs.key_dir, 'dhe1_keylog.dat')
outfile = self.filename_from_id('tls12-dsb-extra.pcapng')
outfile = result_file('tls12-dsb-extra.pcapng')
self.assertRun((cmd_editcap,
'--inject-secrets', 'tls,%s' % key_file,
capture_file('tls12-dsb.pcapng'), outfile
@ -214,11 +214,11 @@ class case_fileformat_pcapng_dsb(subprocesstest.SubprocessTestCase):
(0x544c534b, len(dsb2_contents), dsb2_contents),
))
def test_pcapng_dsb_bad_key(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields):
def test_pcapng_dsb_bad_key(self, cmd_editcap, dirs, capture_file, result_file, check_pcapng_dsb_fields):
'''Insertion of a RSA key file is not very effective.'''
rsa_keyfile = os.path.join(dirs.key_dir, 'rsasnakeoil2.key')
p12_keyfile = os.path.join(dirs.key_dir, 'key.p12')
outfile = self.filename_from_id('rsasnakeoil2-dsb.pcapng')
outfile = result_file('rsasnakeoil2-dsb.pcapng')
proc = self.assertRun((cmd_editcap,
'--inject-secrets', 'tls,%s' % rsa_keyfile,
'--inject-secrets', 'tls,%s' % p12_keyfile,

View File

@ -25,12 +25,12 @@ def io_baseline_str(dirs):
return f.read()
def check_io_4_packets(self, capture_file, cmd=None, from_stdin=False, to_stdout=False):
def check_io_4_packets(self, capture_file, result_file, cmd=None, from_stdin=False, to_stdout=False):
# Test direct->direct, stdin->direct, and direct->stdout file I/O.
# Similar to suite_capture.check_capture_10_packets and
# suite_capture.check_capture_stdin.
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
if from_stdin and to_stdout:
# XXX If we support this, should we bother with separate stdin->direct
# and direct->stdout tests?
@ -57,29 +57,29 @@ def check_io_4_packets(self, capture_file, cmd=None, from_stdin=False, to_stdout
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_tshark_io(subprocesstest.SubprocessTestCase):
def test_tshark_io_stdin_direct(self, cmd_tshark, capture_file):
def test_tshark_io_stdin_direct(self, cmd_tshark, capture_file, result_file):
'''Read from stdin and write direct using TShark'''
check_io_4_packets(self, capture_file, cmd=cmd_tshark, from_stdin=True)
check_io_4_packets(self, capture_file, result_file, cmd=cmd_tshark, from_stdin=True)
def test_tshark_io_direct_stdout(self, cmd_tshark, capture_file):
def test_tshark_io_direct_stdout(self, cmd_tshark, capture_file, result_file):
'''Read direct and write to stdout using TShark'''
check_io_4_packets(self, capture_file, cmd=cmd_tshark, to_stdout=True)
check_io_4_packets(self, capture_file, result_file, cmd=cmd_tshark, to_stdout=True)
def test_tshark_io_direct_direct(self, cmd_tshark, capture_file):
def test_tshark_io_direct_direct(self, cmd_tshark, capture_file, result_file):
'''Read direct and write direct using TShark'''
check_io_4_packets(self, capture_file, cmd=cmd_tshark)
check_io_4_packets(self, capture_file, result_file, cmd=cmd_tshark)
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_rawshark_io(subprocesstest.SubprocessTestCase):
@unittest.skipUnless(sys.byteorder == 'little', 'Requires a little endian system')
def test_rawshark_io_stdin(self, cmd_rawshark, capture_file, io_baseline_str):
def test_rawshark_io_stdin(self, cmd_rawshark, capture_file, result_file, io_baseline_str):
'''Read from stdin using Rawshark'''
# tail -c +25 "${CAPTURE_DIR}dhcp.pcap" | $RAWSHARK -dencap:1 -R "udp.port==68" -nr - > $IO_RAWSHARK_DHCP_PCAP_TESTOUT 2> /dev/null
# diff -u --strip-trailing-cr $IO_RAWSHARK_DHCP_PCAP_BASELINE $IO_RAWSHARK_DHCP_PCAP_TESTOUT > $DIFF_OUT 2>&1
capture_file = capture_file('dhcp.pcap')
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
raw_dhcp_cmd = subprocesstest.cat_dhcp_command('raw')
rawshark_cmd = '{0} | "{1}" -r - -n -dencap:1 -R "udp.port==68"'.format(raw_dhcp_cmd, cmd_rawshark)
rawshark_proc = self.assertRun(rawshark_cmd, shell=True)

View File

@ -32,7 +32,7 @@ file_type_to_testout = {
# arg 4 = number of IDBs generated
# arg 5 = number of file packets merged
# arg 6 = number of some IDB packets merged
def check_mergecap(self, mergecap_proc, file_type, encapsulation, tot_packets, generated_idbs, idb_packets):
def check_mergecap(self, mergecap_proc, file_type, encapsulation, tot_packets, generated_idbs, idb_packets, result_file):
mergecap_returncode = mergecap_proc.returncode
self.assertEqual(mergecap_returncode, 0)
if mergecap_returncode != 0:
@ -45,7 +45,7 @@ def check_mergecap(self, mergecap_proc, file_type, encapsulation, tot_packets, g
self.assertTrue(file_type in file_type_to_descr, 'Invalid file type')
testout_file = self.filename_from_id(file_type_to_testout[file_type])
testout_file = result_file(file_type_to_testout[file_type])
capinfos_testout = self.getCaptureInfo(capinfos_args=('-t', '-E', '-I', '-c'), cap_file=testout_file)
file_descr = file_type_to_descr[file_type]
@ -73,142 +73,142 @@ def check_mergecap(self, mergecap_proc, file_type, encapsulation, tot_packets, g
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_mergecap_pcap(subprocesstest.SubprocessTestCase):
def test_mergecap_basic_1_pcap_pcap(self, cmd_mergecap, capture_file):
def test_mergecap_basic_1_pcap_pcap(self, cmd_mergecap, capture_file, result_file):
'''Merge a single pcap file to pcap'''
# $MERGECAP -vF pcap -w testout.pcap "${CAPTURE_DIR}dhcp.pcap" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-F', 'pcap',
'-w', testout_file,
capture_file('dhcp.pcap'),
))
check_mergecap(self, mergecap_proc, 'pcap', 'Ethernet', 4, 1, 4)
check_mergecap(self, mergecap_proc, 'pcap', 'Ethernet', 4, 1, 4, result_file)
def test_mergecap_basic_2_pcap_pcap(self, cmd_mergecap, capture_file):
def test_mergecap_basic_2_pcap_pcap(self, cmd_mergecap, capture_file, result_file):
'''Merge two pcap files to pcap'''
# $MERGECAP -vF pcap -w testout.pcap "${CAPTURE_DIR}dhcp.pcap" "${CAPTURE_DIR}dhcp.pcap" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-F', 'pcap',
'-w', testout_file,
capture_file('dhcp.pcap'), capture_file('dhcp.pcap'),
))
check_mergecap(self, mergecap_proc, 'pcap', 'Ethernet', 8, 1, 8)
check_mergecap(self, mergecap_proc, 'pcap', 'Ethernet', 8, 1, 8, result_file)
def test_mergecap_basic_3_empty_pcap_pcap(self, cmd_mergecap, capture_file):
def test_mergecap_basic_3_empty_pcap_pcap(self, cmd_mergecap, capture_file, result_file):
'''Merge three pcap files to pcap, two empty'''
# $MERGECAP -vF pcap -w testout.pcap "${CAPTURE_DIR}empty.pcap" "${CAPTURE_DIR}dhcp.pcap" "${CAPTURE_DIR}empty.pcap" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-F', 'pcap',
'-w', testout_file,
capture_file('empty.pcap'), capture_file('dhcp.pcap'), capture_file('empty.pcap'),
))
check_mergecap(self, mergecap_proc, 'pcap', 'Ethernet', 4, 1, 4)
check_mergecap(self, mergecap_proc, 'pcap', 'Ethernet', 4, 1, 4, result_file)
def test_mergecap_basic_2_nano_pcap_pcap(self, cmd_mergecap, capture_file):
def test_mergecap_basic_2_nano_pcap_pcap(self, cmd_mergecap, capture_file, result_file):
'''Merge two pcap files to pcap, one with nanosecond timestamps'''
# $MERGECAP -vF pcap -w testout.pcap "${CAPTURE_DIR}dhcp-nanosecond.pcap" "${CAPTURE_DIR}rsasnakeoil2.pcap" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-F', 'pcap',
'-w', testout_file,
capture_file('dhcp-nanosecond.pcap'), capture_file('rsasnakeoil2.pcap'),
))
check_mergecap(self, mergecap_proc, 'pcap', 'Ethernet', 62, 1, 62)
check_mergecap(self, mergecap_proc, 'pcap', 'Ethernet', 62, 1, 62, result_file)
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_mergecap_pcapng(subprocesstest.SubprocessTestCase):
def test_mergecap_basic_1_pcap_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_basic_1_pcap_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge a single pcap file to pcapng'''
# $MERGECAP -v -w testout.pcap "${CAPTURE_DIR}dhcp.pcap" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-w', testout_file,
capture_file('dhcp.pcap'),
))
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 4, 1, 4)
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 4, 1, 4, result_file)
def test_mergecap_basic_2_pcap_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_basic_2_pcap_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge two pcap files to pcapng'''
# $MERGECAP -v -w testout.pcap "${CAPTURE_DIR}dhcp.pcap" "${CAPTURE_DIR}dhcp.pcap" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-w', testout_file,
capture_file('dhcp.pcap'), capture_file('dhcp.pcap'),
))
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 8, 1, 8)
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 8, 1, 8, result_file)
def test_mergecap_basic_2_pcap_none_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_basic_2_pcap_none_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge two pcap files to pcapng, "none" merge mode'''
# $MERGECAP -vI 'none' -w testout.pcap "${CAPTURE_DIR}dhcp.pcap" "${CAPTURE_DIR}dhcp.pcap" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-I', 'none',
'-w', testout_file,
capture_file('dhcp.pcap'), capture_file('dhcp.pcap'),
))
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 8, 2, 4)
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 8, 2, 4, result_file)
def test_mergecap_basic_2_pcap_all_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_basic_2_pcap_all_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge two pcap files to pcapng, "all" merge mode'''
# $MERGECAP -vI 'all' -w testout.pcap "${CAPTURE_DIR}dhcp.pcap" "${CAPTURE_DIR}dhcp.pcap" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-I', 'all',
'-w', testout_file,
capture_file('dhcp.pcap'), capture_file('dhcp.pcap'),
))
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 8, 1, 8)
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 8, 1, 8, result_file)
def test_mergecap_basic_2_pcap_any_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_basic_2_pcap_any_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge two pcap files to pcapng, "any" merge mode'''
# $MERGECAP -vI 'any' -w testout.pcap "${CAPTURE_DIR}dhcp.pcap" "${CAPTURE_DIR}dhcp.pcap" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-I', 'any',
'-w', testout_file,
capture_file('dhcp.pcap'), capture_file('dhcp.pcap'),
))
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 8, 1, 8)
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 8, 1, 8, result_file)
def test_mergecap_basic_1_pcapng_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_basic_1_pcapng_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge a single pcapng file to pcapng'''
# $MERGECAP -v -w testout.pcap "${CAPTURE_DIR}dhcp.pcapng" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-w', testout_file,
capture_file('dhcp.pcapng'),
))
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 4, 1, 4)
check_mergecap(self, mergecap_proc, 'pcapng', 'Ethernet', 4, 1, 4, result_file)
def test_mergecap_1_pcapng_many_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_1_pcapng_many_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge one pcapng file with many interfaces to pcapng'''
# $MERGECAP -v -w testout.pcap "${CAPTURE_DIR}many_interfaces.pcapng.1" > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-w', testout_file,
capture_file('many_interfaces.pcapng.1'),
))
check_mergecap(self, mergecap_proc, 'pcapng', 'Per packet', 64, 11, 62)
check_mergecap(self, mergecap_proc, 'pcapng', 'Per packet', 64, 11, 62, result_file)
def test_mergecap_3_pcapng_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_3_pcapng_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge multiple pcapng files with many interfaces to pcapng'''
# $MERGECAP -v -w testout.pcap "${CAPTURE_DIR}"many_interfaces.pcapng* > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-w', testout_file,
@ -216,12 +216,12 @@ class case_mergecap_pcapng(subprocesstest.SubprocessTestCase):
capture_file('many_interfaces.pcapng.2'),
capture_file('many_interfaces.pcapng.3'),
))
check_mergecap(self, mergecap_proc, 'pcapng', 'Per packet', 88, 11, 86)
check_mergecap(self, mergecap_proc, 'pcapng', 'Per packet', 88, 11, 86, result_file)
def test_mergecap_3_pcapng_none_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_3_pcapng_none_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge multiple pcapng files with many interfaces to pcapng, "none" merge mode'''
# $MERGECAP -vI 'none' -w testout.pcap "${CAPTURE_DIR}"many_interfaces.pcapng* > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-I', 'none',
@ -230,13 +230,13 @@ class case_mergecap_pcapng(subprocesstest.SubprocessTestCase):
capture_file('many_interfaces.pcapng.2'),
capture_file('many_interfaces.pcapng.3'),
))
check_mergecap(self, mergecap_proc, 'pcapng', 'Per packet', 88, 33, 62)
check_mergecap(self, mergecap_proc, 'pcapng', 'Per packet', 88, 33, 62, result_file)
def test_mergecap_3_pcapng_all_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_3_pcapng_all_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge multiple pcapng files to pcapng in "none" mode, then merge that to "all" mode.'''
# build a pcapng of all the interfaces repeated by using mode 'none'
# $MERGECAP -vI 'none' -w testin.pcap "${CAPTURE_DIR}"many_interfaces.pcapng* > testout.txt 2>&1
testin_file = self.filename_from_id('testin.pcapng')
testin_file = result_file('testin.pcapng')
self.assertRun((cmd_mergecap,
'-V',
'-I', 'none',
@ -249,7 +249,7 @@ class case_mergecap_pcapng(subprocesstest.SubprocessTestCase):
# and use that generated pcap for our test
# $MERGECAP -vI 'all' -w testout.pcap ./testin.pcap ./testin.pcap ./testin.pcap > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-I', 'all',
@ -257,13 +257,13 @@ class case_mergecap_pcapng(subprocesstest.SubprocessTestCase):
testin_file, testin_file, testin_file,
))
# check for 33 IDBs, 88*3=264 total pkts, 62*3=186 in first IDB
check_mergecap(self, mergecap_proc, 'pcapng', 'Per packet', 264, 33, 186)
check_mergecap(self, mergecap_proc, 'pcapng', 'Per packet', 264, 33, 186, result_file)
def test_mergecap_3_pcapng_any_pcapng(self, cmd_mergecap, capture_file):
def test_mergecap_3_pcapng_any_pcapng(self, cmd_mergecap, capture_file, result_file):
'''Merge multiple pcapng files to pcapng in "none" mode, then merge that to "all" mode.'''
# build a pcapng of all the interfaces repeated by using mode 'none'
# $MERGECAP -vI 'none' -w testin.pcap "${CAPTURE_DIR}"many_interfaces.pcapng* > testout.txt 2>&1
testin_file = self.filename_from_id('testin.pcapng')
testin_file = result_file('testin.pcapng')
self.assertRun((cmd_mergecap,
'-V',
'-I', 'none',
@ -276,7 +276,7 @@ class case_mergecap_pcapng(subprocesstest.SubprocessTestCase):
# and use that generated pcap for our test
# $MERGECAP -vI 'any' -w testout.pcap ./testin.pcap ./testin.pcap ./testin.pcap > testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcapng)
testout_file = result_file(testout_pcapng)
mergecap_proc = self.assertRun((cmd_mergecap,
'-V',
'-I', 'any',
@ -284,4 +284,4 @@ class case_mergecap_pcapng(subprocesstest.SubprocessTestCase):
testin_file, testin_file, testin_file,
))
# check for 11 IDBs, 88*3=264 total pkts, 86*3=258 in first IDB
check_mergecap(self, mergecap_proc, 'pcapng', 'Per packet', 264, 11, 258)
check_mergecap(self, mergecap_proc, 'pcapng', 'Per packet', 264, 11, 258, result_file)

View File

@ -92,7 +92,7 @@ def compare_capinfos_info(self, cii1, cii2, filename1, filename2):
self.fail('text2pcap output file differs from input file.')
@fixtures.fixture
def check_text2pcap(cmd_tshark, cmd_text2pcap, capture_file):
def check_text2pcap(cmd_tshark, cmd_text2pcap, capture_file, result_file):
def check_text2pcap_real(self, cap_filename, file_type, expected_packets=None, expected_datasize=None):
# Perform the following actions
# - Get information for the input pcap file with capinfos
@ -119,7 +119,7 @@ def check_text2pcap(cmd_tshark, cmd_text2pcap, capture_file):
# text2pcap_generate_input()
# $TSHARK -o 'gui.column.format:"Time","%t"' -tad -P -x -r $1 > testin.txt
testin_file = self.filename_from_id(testin_txt)
testin_file = result_file(testin_txt)
tshark_cmd = '{cmd} -r {cf} -o gui.column.format:"Time","%t" -t ad -P --hexdump frames > {of}'.format(
cmd = cmd_tshark,
cf = cap_file,
@ -128,7 +128,7 @@ def check_text2pcap(cmd_tshark, cmd_text2pcap, capture_file):
self.assertRun(tshark_cmd, shell=True)
testout_fname = file_type_to_testout[file_type]
testout_file = self.filename_from_id(testout_fname)
testout_file = result_file(testout_fname)
# The first word is the file type (the rest might be compression info)
filetype_flag = pre_cap_info['filetype'].split()[0]
# We want the -a flag, because the tshark -x format is a hex+ASCII
@ -263,10 +263,10 @@ def check_rawip(run_text2pcap_capinfos_tshark, request):
@fixtures.mark_usefixtures('base_env')
@fixtures.uses_fixtures
class case_text2pcap_parsing(subprocesstest.SubprocessTestCase):
def test_text2pcap_eol_hash(self, cmd_text2pcap, capture_file):
def test_text2pcap_eol_hash(self, cmd_text2pcap, capture_file, result_file):
'''Test text2pcap hash sign at the end-of-line.'''
txt_fname = 'text2pcap_hash_eol.txt'
testout_file = self.filename_from_id(testout_pcap)
testout_file = result_file(testout_pcap)
self.assertRun((cmd_text2pcap,
'-F', 'pcapng',
'-t', '%Y-%m-%d %H:%M:%S.',
@ -338,11 +338,11 @@ class case_text2pcap_parsing(subprocesstest.SubprocessTestCase):
@fixtures.fixture
def run_text2pcap_capinfos_tshark(cmd_text2pcap, cmd_tshark, request):
def run_text2pcap_capinfos_tshark(cmd_text2pcap, cmd_tshark, request, result_file):
def run_text2pcap_capinfos_tshark_real(content, args):
test = request.instance
testin_file = test.filename_from_id(testin_txt)
testout_file = test.filename_from_id(testout_pcap)
testin_file = result_file(testin_txt)
testout_file = result_file(testout_pcap)
with open(testin_file, "w") as f:
f.write(content)
@ -442,7 +442,7 @@ class case_text2pcap_ipv4(subprocesstest.SubprocessTestCase):
@fixtures.fixture
def run_text2pcap_ipv6(cmd_tshark, run_text2pcap_capinfos_tshark, request):
def run_text2pcap_ipv6(cmd_tshark, run_text2pcap_capinfos_tshark, request, result_file):
self = request.instance
def run_text2pcap_ipv6_real(content, text2pcap_args, tshark_args = ()):
#Run the common text2pcap tests
@ -451,7 +451,7 @@ def run_text2pcap_ipv6(cmd_tshark, run_text2pcap_capinfos_tshark, request):
#Decode the output pcap in JSON format
self.assertRun((cmd_tshark, '-T', 'json',
'-r', self.filename_from_id(testout_pcap)) + tshark_args)
'-r', result_file(testout_pcap)) + tshark_args)
data = json.loads(self.processes[-1].stdout_str)
#Add IPv6 payload length and payload length tree to the result dict
@ -554,10 +554,10 @@ class case_text2pcap_i_proto(subprocesstest.SubprocessTestCase):
@fixtures.uses_fixtures
class case_text2pcap_other_options(subprocesstest.SubprocessTestCase):
'''Test other command line options'''
def test_text2pcap_option_N(self, cmd_text2pcap, cmd_tshark, capture_file):
def test_text2pcap_option_N(self, cmd_text2pcap, cmd_tshark, capture_file, result_file):
'''Test -N <intf-name> option'''
testin_file = self.filename_from_id(testin_txt)
testout_file = self.filename_from_id(testout_pcapng)
testin_file = result_file(testin_txt)
testout_file = result_file(testout_pcapng)
with open(testin_file, 'w') as f:
f.write("0000 00\n")

View File

@ -61,26 +61,6 @@ class case_unittests(subprocesstest.SubprocessTestCase):
'''fieldcount'''
self.assertRun((cmd_tshark, '-G', 'fieldcount'), env=test_env)
def test_unit_ctest_coverage(self, all_test_groups):
'''Make sure CTest runs all of our tests.'''
with open(os.path.join(os.path.dirname(__file__), '..', 'CMakeLists.txt')) as cml_fd:
group_re = re.compile(r'set *\( *_test_group_list')
in_list = False
cml_groups = []
for cml_line in cml_fd:
if group_re.search(cml_line):
in_list = True
continue
if in_list:
if ')' in cml_line:
break
cml_groups.append(cml_line.strip())
cml_groups.sort()
if not all_test_groups == cml_groups:
diff = '\n'.join(list(difflib.unified_diff(all_test_groups, cml_groups, 'all test groups', 'CMakeLists.txt test groups')))
self.fail("CMakeLists.txt doesn't test all available groups:\n" + diff)
class Proto:
"""Data for a protocol."""
def __init__(self, line):

View File

@ -45,8 +45,8 @@ def check_lua_script(cmd_tshark, features, dirs, capture_file):
return check_lua_script_real
@fixtures.fixture(scope='session')
def check_lua_script_verify(check_lua_script):
@fixtures.fixture
def check_lua_script_verify(check_lua_script, result_file):
def check_lua_script_verify_real(self, lua_script, cap_file, check_stage_1=False, heur_regmode=None):
# First run tshark with the dissector script.
if heur_regmode is None:
@ -60,7 +60,7 @@ def check_lua_script_verify(check_lua_script):
)
# then dump tshark's output to a verification file.
verify_file = self.filename_from_id('testin.txt')
verify_file = result_file('testin.txt')
with open(verify_file, 'w', newline='\n') as f:
f.write(tshark_proc.stdout_str)
@ -153,10 +153,10 @@ class case_wslua(subprocesstest.SubprocessTestCase):
self.diffOutput(lua_out, tshark_out, 'tshark + lua script', 'tshark only')
def test_wslua_file_writer(self, check_lua_script, capture_file):
def test_wslua_file_writer(self, check_lua_script, capture_file, result_file):
'''wslua file writer'''
cap_file_1 = capture_file(dhcp_pcap)
cap_file_2 = self.filename_from_id('lua_writer.pcap')
cap_file_2 = result_file('lua_writer.pcap')
# Generate a new capture file using the Lua writer.
check_lua_script(self, 'pcap_file.lua', cap_file_1, False,
@ -165,10 +165,10 @@ class case_wslua(subprocesstest.SubprocessTestCase):
)
self.assertTrue(filecmp.cmp(cap_file_1, cap_file_2), cap_file_1 + ' differs from ' + cap_file_2)
def test_wslua_file_acme_reader(self, check_lua_script, cmd_tshark, capture_file):
def test_wslua_file_acme_reader(self, check_lua_script, cmd_tshark, capture_file, result_file):
'''wslua acme file reader'''
cap_file = self.filename_from_id('lua_acme_reader.pcap')
cap_file = result_file('lua_acme_reader.pcap')
# Read an acme sipmsg.log using the acme Lua reader, writing it out as pcapng.
check_lua_script(self, 'acme_file.lua', sipmsg_log, False,
'-w', cap_file,

View File

@ -1,143 +0,0 @@
#!/usr/bin/env python3
#
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
'''Main test script'''
# To do:
# - Avoid printing Python tracebacks when we assert? It looks like we'd need
# to override unittest.TextTestResult.addFailure().
import argparse
import codecs
import os.path
import suite_external
import sys
import unittest
import fixtures
# Required to make fixtures available to tests!
import fixtures_ws
_all_test_groups = None
@fixtures.fixture(scope='session')
def all_test_groups():
return _all_test_groups
def find_test_ids(suite, all_ids):
if hasattr(suite, '__iter__'):
for s in suite:
find_test_ids(s, all_ids)
else:
all_ids.append(suite.id())
def main():
parser = argparse.ArgumentParser(description='Wireshark unit tests')
cap_group = parser.add_mutually_exclusive_group()
cap_group.add_argument('-E', '--disable-capture', action='store_true', help='Disable capture tests')
release_group = parser.add_mutually_exclusive_group()
release_group.add_argument('--enable-release', action='store_true', help='Enable release tests')
parser.add_argument('-p', '--program-path', default=os.path.curdir, help='Path to Wireshark executables.')
parser.add_argument('-x', '--add-external-tests', action='append', help='Path to an external test definition (.json) file.')
parser.add_argument('--skip-missing-programs',
help='Skip tests that lack programs from this list instead of failing'
' them. Use "all" to ignore all missing programs.')
list_group = parser.add_mutually_exclusive_group()
list_group.add_argument('-l', '--list', action='store_true', help='List tests. One of "all" or a full or partial test name.')
list_group.add_argument('--list-suites', action='store_true', help='List all suites.')
list_group.add_argument('--list-groups', action='store_true', help='List all suites and groups.')
list_group.add_argument('--list-cases', action='store_true', help='List all suites, groups, and cases.')
parser.add_argument('-v', '--verbose', action='store_const', const=2, default=1, help='Verbose tests.')
parser.add_argument('tests_to_run', nargs='*', metavar='test', default=['all'], help='Tests to run. One of "all" or a full or partial test name. Default is "all".')
args = parser.parse_args()
# XXX This should be a fixture.
suite_external.add_external_configs(args.add_external_tests)
all_tests = unittest.defaultTestLoader.discover(os.path.dirname(__file__), pattern='suite_*')
all_ids = []
find_test_ids(all_tests, all_ids)
run_ids = []
for tid in all_ids:
for ttr in args.tests_to_run:
ttrl = ttr.lower()
if ttrl == 'all':
run_ids = all_ids
break
if ttrl in tid.lower():
run_ids.append(tid)
if not run_ids:
print('No tests found. You asked for:\n ' + '\n '.join(args.tests_to_run))
parser.print_usage()
sys.exit(1)
if args.list:
print('\n'.join(run_ids))
sys.exit(0)
all_suites = set()
for aid in all_ids:
aparts = aid.split('.')
all_suites |= {aparts[0]}
all_suites = sorted(all_suites)
all_groups = set()
for aid in all_ids:
aparts = aid.split('.')
if aparts[1].startswith('group_'):
all_groups |= {'.'.join(aparts[:2])}
else:
all_groups |= {aparts[0]}
all_groups = sorted(all_groups)
global _all_test_groups
_all_test_groups = all_groups
if args.list_suites:
print('\n'.join(all_suites))
sys.exit(0)
if args.list_groups:
print('\n'.join(all_groups))
sys.exit(0)
if args.list_cases:
cases = set()
for rid in run_ids:
rparts = rid.split('.')
cases |= {'.'.join(rparts[:2])}
print('\n'.join(list(cases)))
sys.exit(0)
if codecs.lookup(sys.stdout.encoding).name != 'utf-8':
import locale
sys.stderr.write('Warning: Output encoding is {0} and not utf-8.\n'.format(sys.stdout.encoding))
sys.stdout = codecs.getwriter(locale.getpreferredencoding())(sys.stdout.buffer, 'backslashreplace')
sys.stderr = codecs.getwriter(locale.getpreferredencoding())(sys.stderr.buffer, 'backslashreplace')
run_suite = unittest.defaultTestLoader.loadTestsFromNames(run_ids)
runner = unittest.TextTestRunner(verbosity=args.verbose)
# for unittest compatibility (not needed with pytest)
fixtures_ws.fixtures.create_session(args)
try:
test_result = runner.run(run_suite)
finally:
# for unittest compatibility (not needed with pytest)
fixtures_ws.fixtures.destroy_session()
if test_result.errors:
sys.exit(2)
if test_result.failures:
sys.exit(1)
if __name__ == '__main__':
main()