test: finalize suite_capture conversion to fixtures, drop config.py

Convert the old start_pinging routine to use pytest fixtures, rewriting
it to enable a different generator that uses (for example) UDP.
Remove the config module since it is no longer neded.

Change-Id: Ic4727157faab084b41144e8f16ea44f59c9037d8
Reviewed-on: https://code.wireshark.org/review/30659
Petri-Dish: Peter Wu <peter@lekensteyn.nl>
Tested-by: Petri Dish Buildbot
Reviewed-by: Peter Wu <peter@lekensteyn.nl>
This commit is contained in:
Peter Wu 2018-11-16 02:28:32 +01:00
parent 3ab521118a
commit 88ce087dcf
4 changed files with 89 additions and 308 deletions

View File

@ -1,257 +0,0 @@
#
# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
'''Configuration'''
import logging
import os
import os.path
import re
import shutil
import subprocess
import sys
import tempfile
commands = (
'capinfos',
'dumpcap',
'mergecap',
'rawshark',
'sharkd',
'text2pcap',
'tshark',
'wireshark',
)
can_capture = False
capture_interface = None
# Our executables
program_path = None
# Strings
cmd_capinfos = None
cmd_dumpcap = None
cmd_mergecap = None
cmd_rawshark = None
cmd_tshark = None
cmd_text2pcap = None
cmd_wireshark = None
# Arrays
args_ping = None
have_lua = False
have_nghttp2 = False
have_kerberos = False
have_libgcrypt16 = False
have_libgcrypt17 = False
test_env = None
program_path = None
home_path = None
conf_path = None
custom_profile_path = None
custom_profile_name = 'Custom Profile'
this_dir = os.path.dirname(__file__)
baseline_dir = os.path.join(this_dir, 'baseline')
capture_dir = os.path.join(this_dir, 'captures')
config_dir = os.path.join(this_dir, 'config')
key_dir = os.path.join(this_dir, 'keys')
lua_dir = os.path.join(this_dir, 'lua')
tools_dir = os.path.join(this_dir, '..', 'tools')
all_groups = []
def canCapture():
# XXX This appears to be evaluated at the wrong time when called
# from a unittest.skipXXX decorator.
return can_capture and capture_interface is not None
def setCanCapture(new_cc):
can_capture = new_cc
def setCaptureInterface(iface):
global capture_interface
setCanCapture(True)
capture_interface = iface
def canMkfifo():
return not sys.platform.startswith('win32')
def canDisplay():
if sys.platform.startswith('win32') or sys.platform.startswith('darwin'):
return True
# Qt requires XKEYBOARD and Xrender, which Xvnc doesn't provide.
return False
def getTsharkInfo():
global have_lua
global have_nghttp2
global have_kerberos
global have_libgcrypt16
global have_libgcrypt17
if not cmd_tshark:
logging.warning("tshark binary is not yet set")
return
try:
tshark_v = subprocess.check_output(
(cmd_tshark, '--version'),
stderr=subprocess.PIPE,
universal_newlines=True,
env=baseEnv()
).replace('\n', ' ')
except subprocess.CalledProcessError as e:
logging.warning("Failed to detect tshark features: %s", e)
tshark_v = ''
have_lua = bool(re.search('with +Lua', tshark_v))
have_nghttp2 = bool(re.search('with +nghttp2', tshark_v))
have_kerberos = bool(re.search('(with +MIT +Kerberos|with +Heimdal +Kerberos)', tshark_v))
gcry_m = re.search('with +Gcrypt +([0-9]+\.[0-9]+)', tshark_v)
have_libgcrypt16 = gcry_m and float(gcry_m.group(1)) >= 1.6
have_libgcrypt17 = gcry_m and float(gcry_m.group(1)) >= 1.7
def getDefaultCaptureInterface():
'''Choose a default capture interface for our platform. Currently Windows only.'''
global capture_interface
if capture_interface:
return
if cmd_dumpcap is None:
return
if not sys.platform.startswith('win32'):
return
try:
dumpcap_d_data = subprocess.check_output((cmd_dumpcap, '-D'), stderr=subprocess.PIPE)
dumpcap_d_stdout = dumpcap_d_data.decode('UTF-8', 'replace')
for d_line in dumpcap_d_stdout.splitlines():
iface_m = re.search('(\d+)\..*(Ethernet|Network Connection|VMware|Intel)', d_line)
if iface_m:
capture_interface = iface_m.group(1)
break
except:
pass
def getPingCommand():
'''Return an argument list required to ping www.wireshark.org for 60 seconds.'''
global args_ping
# XXX The shell script tests swept over packet sizes from 1 to 240 every 0.25 seconds.
if sys.platform.startswith('win32'):
# XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping
args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org')
elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org')
elif sys.platform.startswith('darwin'):
args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org')
# XXX Other BSDs, Solaris, etc
def setProgramPath(path):
global program_path
program_path = path
retval = True
dotexe = ''
if sys.platform.startswith('win32'):
dotexe = '.exe'
for cmd in commands:
cmd_var = 'cmd_' + cmd
cmd_path = os.path.normpath(os.path.join(path, cmd + dotexe))
if not os.path.exists(cmd_path) or not os.access(cmd_path, os.X_OK):
cmd_path = None
program_path = None
retval = False
globals()[cmd_var] = cmd_path
getTsharkInfo()
getDefaultCaptureInterface()
setUpHostFiles()
return retval
def baseEnv(home=None):
"""A modified environment to ensure reproducible tests."""
env = os.environ.copy()
env['TZ'] = 'UTC'
home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME'
if home:
env[home_env] = home
else:
# This directory is supposed not to be written and is used by
# "readonly" tests that do not read any other preferences.
env[home_env] = "/wireshark-tests-unused"
return env
def setUpTestEnvironment():
global home_path
global conf_path
global custom_profile_path
global test_env
# Create our directories
test_confdir = tempfile.mkdtemp(prefix='wireshark-tests.')
home_path = os.path.join(test_confdir, 'home')
if sys.platform.startswith('win32'):
conf_path = os.path.join(home_path, 'Wireshark')
else:
conf_path = os.path.join(home_path, '.config', 'wireshark')
os.makedirs(conf_path)
# Test spaces while we're here.
custom_profile_path = os.path.join(conf_path, 'profiles', custom_profile_name)
os.makedirs(custom_profile_path)
# Populate our UAT files
uat_files = [
'80211_keys',
'dtlsdecrypttablefile',
'esp_sa',
'ssl_keys',
'c1222_decryption_table',
'ikev1_decryption_table',
'ikev2_decryption_table',
]
for uat in uat_files:
setUpUatFile(conf_path, uat)
# Set up our environment
test_env = baseEnv(home=home_path)
test_env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = 'True'
test_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True'
def setUpUatFile(conf_path, conf_file):
template = os.path.join(os.path.dirname(__file__), 'config', conf_file) + '.tmpl'
with open(template, 'r') as tplt_fd:
tplt_contents = tplt_fd.read()
tplt_fd.close()
key_dir_path = os.path.join(key_dir, '')
# uat.c replaces backslashes...
key_dir_path = key_dir_path.replace('\\', '\\x5c')
cf_contents = tplt_contents.replace('TEST_KEYS_DIR', key_dir_path)
out_file = os.path.join(conf_path, conf_file)
with open(out_file, 'w') as cf_fd:
cf_fd.write(cf_contents)
cf_fd.close()
def setUpHostFiles():
global program_path
global conf_path
global custom_profile_path
if program_path is None:
return
if conf_path is None or custom_profile_path is None:
setUpTestEnvironment()
bundle_path = os.path.join(program_path, 'Wireshark.app', 'Contents', 'MacOS')
if os.path.isdir(bundle_path):
global_path = bundle_path
else:
global_path = program_path
hosts_path_pfx = os.path.join(this_dir, 'hosts.')
shutil.copyfile(hosts_path_pfx + 'global', os.path.join(global_path, 'hosts'))
shutil.copyfile(hosts_path_pfx + 'personal', os.path.join(conf_path, 'hosts'))
shutil.copyfile(hosts_path_pfx + 'custom', os.path.join(custom_profile_path, 'hosts'))
if sys.platform.startswith('win32') or sys.platform.startswith('darwin'):
can_capture = True
# Initialize ourself.
getPingCommand()

View File

@ -17,7 +17,6 @@ import tempfile
import types
import fixtures
import config
import subprocesstest
@ -126,15 +125,14 @@ def wireshark_command(cmd_wireshark):
@fixtures.fixture(scope='session')
def features(cmd_tshark):
def features(cmd_tshark, make_env):
'''Returns an object describing available features in tshark.'''
try:
# XXX stop using config
tshark_v = subprocess.check_output(
(cmd_tshark, '--version'),
stderr=subprocess.PIPE,
universal_newlines=True,
env=config.baseEnv()
env=make_env()
)
tshark_v = re.sub(r'\s+', ' ', tshark_v)
except subprocess.CalledProcessError as ex:
@ -190,14 +188,28 @@ def conf_path(home_path):
return conf_path
@fixtures.fixture(scope='session')
def make_env():
"""A factory for a modified environment to ensure reproducible tests."""
def make_env_real(home=None):
env = os.environ.copy()
env['TZ'] = 'UTC'
home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME'
if home:
env[home_env] = home
else:
# This directory is supposed not to be written and is used by
# "readonly" tests that do not read any other preferences.
env[home_env] = "/wireshark-tests-unused"
return env
return make_env_real
@fixtures.fixture
def base_env(home_path, request):
def base_env(home_path, make_env, request):
"""A modified environment to ensure reproducible tests. Tests can modify
this environment as they see fit."""
env = os.environ.copy()
env['TZ'] = 'UTC'
home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME'
env[home_env] = home_path
env = make_env(home=home_path)
# Remove this if test instances no longer inherit from SubprocessTestCase?
if isinstance(request.instance, subprocesstest.SubprocessTestCase):
@ -207,7 +219,7 @@ def base_env(home_path, request):
@fixtures.fixture
def test_env(base_env, conf_path, request):
def test_env(base_env, conf_path, request, dirs):
'''A process environment with a populated configuration directory.'''
# Populate our UAT files
uat_files = [
@ -219,9 +231,16 @@ def test_env(base_env, conf_path, request):
'ikev1_decryption_table',
'ikev2_decryption_table',
]
# uat.c replaces backslashes...
key_dir_path = os.path.join(dirs.key_dir, '').replace('\\', '\\x5c')
for uat in uat_files:
# XXX stop using config
config.setUpUatFile(conf_path, uat)
template_file = os.path.join(dirs.config_dir, uat + '.tmpl')
out_file = os.path.join(conf_path, uat)
with open(template_file, 'r') as f:
template_contents = f.read()
cf_contents = template_contents.replace('TEST_KEYS_DIR', key_dir_path)
with open(out_file, 'w') as f:
f.write(cf_contents)
env = base_env
env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = '1'

View File

@ -23,8 +23,6 @@ import unittest
# - Add a subprocesstest.SkipUnlessCapture decorator?
# - Try to catch crashes? See the comments below in waitProcess.
# XXX This should probably be in config.py and settable from
# the command line.
process_timeout = 300 # Seconds
def cat_dhcp_command(mode):
@ -33,7 +31,8 @@ def cat_dhcp_command(mode):
sd_cmd = ''
if sys.executable:
sd_cmd = '"{}" '.format(sys.executable)
sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
this_dir = os.path.dirname(__file__)
sd_cmd += os.path.join(this_dir, 'util_dump_dhcp_pcap.py ' + mode)
return sd_cmd
class LoggingPopen(subprocess.Popen):
@ -256,9 +255,6 @@ class SubprocessTestCase(unittest.TestCase):
# fixture (via a test method parameter or class decorator).
assert not (env is None and hasattr(self, '_fixture_request')), \
"Decorate class with @fixtures.mark_usefixtures('test_env')"
if env is None:
# Avoid using the test user's real environment by default.
env = config.test_env
proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd)
self.processes.append(proc)
return proc

View File

@ -9,7 +9,6 @@
#
'''Capture tests'''
import config
import glob
import os
import subprocess
@ -24,20 +23,48 @@ capture_duration = 5
testout_pcap = 'testout.pcap'
snapshot_len = 96
def start_pinging(self):
ping_procs = []
@fixtures.fixture
def traffic_generator():
'''
Traffic generator factory. Invoking it returns a tuple (start_func, cfilter)
where cfilter is a capture filter to match the generated traffic.
start_func can be invoked to start generating traffic and returns a function
which can be used to stop traffic generation early.
Currently calls ping www.wireshark.org for 60 seconds.
'''
# XXX replace this by something that generates UDP traffic to localhost?
# That would avoid external access which is forbidden by the Debian policy.
nprocs = 1
if sys.platform.startswith('win32'):
# Fake '-i' with a subsecond interval.
for st in (0.1, 0.1, 0):
ping_procs.append(self.startProcess(config.args_ping))
time.sleep(st)
# XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping
args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org')
nprocs = 3
elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org')
elif sys.platform.startswith('darwin'):
args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org')
else:
ping_procs.append(self.startProcess(config.args_ping))
return ping_procs
def stop_pinging(ping_procs):
for proc in ping_procs:
proc.kill()
# XXX Other BSDs, Solaris, etc
fixtures.skip('ping utility is unavailable - cannot generate traffic')
procs = []
def kill_processes():
for proc in procs:
proc.kill()
for proc in procs:
proc.wait()
procs.clear()
def start_processes():
for i in range(nprocs):
if i > 0:
# Fake subsecond interval if the ping utility lacks support.
time.sleep(0.1)
proc = subprocess.Popen(args_ping, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
procs.append(proc)
return kill_processes
try:
yield start_processes, 'icmp || icmp6'
finally:
kill_processes()
@fixtures.fixture(scope='session')
@ -54,14 +81,12 @@ def capture_command(*cmd_args, shell=False):
@fixtures.fixture
def check_capture_10_packets(capture_interface, cmd_dumpcap):
def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator):
start_traffic, cfilter = traffic_generator
def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
# Similar to suite_io.check_io_4_packets.
if not config.args_ping:
self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
ping_procs = start_pinging(self)
stop_traffic = start_traffic()
if to_stdout:
capture_proc = self.runProcess(capture_command(cmd,
'-i', '"{}"'.format(capture_interface),
@ -69,7 +94,7 @@ def check_capture_10_packets(capture_interface, cmd_dumpcap):
'-w', '-',
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
'-f', '"icmp || icmp6"',
'-f', '"{}"'.format(cfilter),
'>', testout_file,
shell=True
),
@ -82,10 +107,10 @@ def check_capture_10_packets(capture_interface, cmd_dumpcap):
'-w', testout_file,
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
'-f', 'icmp || icmp6',
'-f', cfilter,
))
stop_traffic()
capture_returncode = capture_proc.returncode
stop_pinging(ping_procs)
if capture_returncode != 0:
self.log_fd.write('{} -D output:\n'.format(cmd))
self.runProcess((cmd, '-D'))
@ -160,13 +185,12 @@ def check_capture_stdin(cmd_dumpcap):
@fixtures.fixture
def check_capture_read_filter(capture_interface):
def check_capture_read_filter(capture_interface, traffic_generator):
start_traffic, cfilter = traffic_generator
def check_capture_read_filter_real(self, cmd=None):
if not config.args_ping:
self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
self.assertIsNotNone(cmd)
ping_procs = start_pinging(self)
testout_file = self.filename_from_id(testout_pcap)
stop_traffic = start_traffic()
capture_proc = self.runProcess(capture_command(cmd,
'-i', capture_interface,
'-p',
@ -175,10 +199,10 @@ def check_capture_read_filter(capture_interface):
'-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
'-f', 'icmp || icmp6',
'-f', cfilter,
))
stop_traffic()
capture_returncode = capture_proc.returncode
stop_pinging(ping_procs)
self.assertEqual(capture_returncode, 0)
if (capture_returncode == 0):
@ -186,12 +210,11 @@ def check_capture_read_filter(capture_interface):
return check_capture_read_filter_real
@fixtures.fixture
def check_capture_snapshot_len(capture_interface, cmd_tshark):
def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator):
start_traffic, cfilter = traffic_generator
def check_capture_snapshot_len_real(self, cmd=None):
if not config.args_ping:
self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
self.assertIsNotNone(cmd)
ping_procs = start_pinging(self)
stop_traffic = start_traffic()
testout_file = self.filename_from_id(testout_pcap)
capture_proc = self.runProcess(capture_command(cmd,
'-i', capture_interface,
@ -199,10 +222,10 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark):
'-w', testout_file,
'-s', str(snapshot_len),
'-a', 'duration:{}'.format(capture_duration),
'-f', 'icmp || icmp6',
'-f', cfilter,
))
stop_traffic()
capture_returncode = capture_proc.returncode
stop_pinging(ping_procs)
self.assertEqual(capture_returncode, 0)
self.assertTrue(os.path.isfile(testout_file))