Test: Add dftest to our tests.

Move the dfilter tests and captures from tools to test.

Change-Id: I2e6a6cc1d383c985ba07c76c93ae1c57d3c8f84c
Reviewed-on: https://code.wireshark.org/review/27339
Petri-Dish: Gerald Combs <gerald@wireshark.org>
Tested-by: Petri Dish Buildbot
Reviewed-by: Gerald Combs <gerald@wireshark.org>
This commit is contained in:
Gerald Combs 2018-05-03 12:05:12 -07:00
parent 8db1616ec3
commit 7591ed848e
33 changed files with 100 additions and 206 deletions

View File

@ -203,6 +203,7 @@ def setUpTestEnvironment():
# Set up our environment
test_env = os.environ.copy()
test_env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = '1'
test_env['TZ'] = 'UTC'
test_env[home_env] = home_path
def setUpUatFile(conf_file):

View File

@ -221,21 +221,32 @@ class SubprocessTestCase(unittest.TestCase):
got_num_packets = True
self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
def countOutput(self, search_pat, proc=None):
def countOutput(self, search_pat=None, count_stdout=True, count_stderr=False, proc=None):
'''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
match_count = 0
self.assertTrue(count_stdout or count_stderr, 'No output to count.')
if proc is None:
proc = self.processes[-1]
# We might want to let the caller decide what we're searching.
out_data = proc.stdout_str + proc.stderr_str
out_data = u''
if count_stdout:
out_data = proc.stdout_str
if count_stderr:
out_data += proc.stderr_str
if search_pat is None:
return len(out_data.splitlines())
search_re = re.compile(search_pat)
for line in out_data.splitlines():
if search_re.search(line):
match_count += 1
return match_count
def grepOutput(self, search_pat, proc=None):
return self.countOutput(search_pat, proc) > 0
return self.countOutput(search_pat, count_stderr=True, proc=proc) > 0
def diffOutput(self, blob_a, blob_b, *args, **kwargs):
'''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.

View File

@ -166,6 +166,7 @@ class case_tshark_dump_glossaries(subprocesstest.SubprocessTestCase):
except:
pass
self.assertRun((config.cmd_tshark, '-G', glossary))
self.assertEqual(self.countOutput(count_stdout=False, count_stderr=True), 0, 'Found error output while printing glossary ' + glossary)
def test_tshark_glossary_valid_utf8(self):
for glossary in glossaries:

View File

@ -0,0 +1,14 @@
#
# Copyright (C) 2013 by Gilbert Ramirez <gram@alumni.rice.edu>
#
# SPDX-License-Identifier: GPL-2.0-or-later
import os.path
import unittest
# Run by unittest.defaultTestLoader.discover in test.py
def load_tests(loader, standard_tests, pattern):
this_dir = os.path.dirname(__file__)
package_tests = loader.discover(start_dir=this_dir, pattern='group_*.py')
standard_tests.addTests(package_tests)
return standard_tests

View File

@ -0,0 +1,36 @@
# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu>
#
# SPDX-License-Identifier: GPL-2.0-or-later
import config
import os.path
import subprocesstest
class DFTestCase(subprocesstest.SubprocessTestCase):
"""Base class for all tests in this dfilter-test collection."""
def runDFilter(self, dfilter, expected_return=0):
# Create the tshark command
return self.assertRun((config.cmd_tshark,
"-n", # No name resolution
"-r", # Next arg is trace file to read
os.path.join(config.capture_dir, self.trace_file),
"-Y", # packet display filter (used to be -R)
dfilter
), expected_return=expected_return)
def assertDFilterCount(self, dfilter, expected_count):
"""Run a display filter and expect a certain number of packets."""
dfilter_proc = self.runDFilter(dfilter)
dfp_count = self.countOutput()
msg = "Expected %d, got: %s" % (expected_count, dfp_count)
self.assertEqual(dfp_count, expected_count, msg)
def assertDFilterFail(self, dfilter):
"""Run a display filter and expect tshark to fail"""
dfilter_proc = self.runDFilter(dfilter, expected_return=self.exit_error)

View File

@ -2,9 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
from dftestlib import dftest
import dfiltertest
class testBytesEther(dftest.DFTest):
class case_bytes_ether(dfiltertest.DFTestCase):
trace_file = "ipx_rip.pcap"
### Note: Bytes test does not yet test FT_INT64.

View File

@ -2,9 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
from dftestlib import dftest
import dfiltertest
class testBytesIPv6(dftest.DFTest):
class case_bytes_ipv6(dfiltertest.DFTestCase):
trace_file = "ipv6.pcap"
def test_eq_1(self):

View File

@ -2,10 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
from dftestlib import dftest
class testBytes(dftest.DFTest):
class case_bytes_type(dfiltertest.DFTestCase):
trace_file = "arp.pcap"
def test_bytes_1(self):

View File

@ -2,10 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
from dftestlib import dftest
class testDouble(dftest.DFTest):
class case_double(dfiltertest.DFTestCase):
trace_file = "ntp.pcap"

View File

@ -2,10 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
from dftestlib import dftest
class testInteger(dftest.DFTest):
class case_integer(dfiltertest.DFTestCase):
trace_file = "ntp.pcap"
def test_eq_1(self):

View File

@ -3,9 +3,9 @@
# SPDX-License-Identifier: GPL-2.0-or-later
from dftestlib import dftest
import dfiltertest
class testInteger1Byte(dftest.DFTest):
class case_integer_1_byte(dfiltertest.DFTestCase):
trace_file = "ipx_rip.pcap"

View File

@ -3,9 +3,9 @@
# SPDX-License-Identifier: GPL-2.0-or-later
from dftestlib import dftest
import dfiltertest
class testIPv4(dftest.DFTest):
class case_ipv4(dfiltertest.DFTestCase):
trace_file = "nfs.pcap"
def test_uint64_1(self):

View File

@ -2,10 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
from dftestlib import dftest
class testMembership(dftest.DFTest):
class case_membership(dfiltertest.DFTestCase):
trace_file = "http.pcap"
def test_membership_1_match(self):

View File

@ -2,10 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
from dftestlib import dftest
class testRange(dftest.DFTest):
class case_range(dfiltertest.DFTestCase):
trace_file = "ipx_rip.pcap"
def test_slice_1_pos(self):

View File

@ -2,10 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
from dftestlib import dftest
class testScanner(dftest.DFTest):
class case_scanner(dfiltertest.DFTestCase):
trace_file = "http.pcap"
def test_dquote_1(self):

View File

@ -2,10 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
from dftestlib import dftest
class testString(dftest.DFTest):
class case_string(dfiltertest.DFTestCase):
trace_file = "http.pcap"
def test_eq_1(self):

View File

@ -2,10 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
from dftestlib import dftest
class testStringz(dftest.DFTest):
class case_stringz(dfiltertest.DFTestCase):
trace_file = "tftp.pcap"
def test_stringz_1(self):

View File

@ -2,10 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
from dftestlib import dftest
class testTimeRelative(dftest.DFTest):
class case_time_relative(dfiltertest.DFTestCase):
trace_file = "nfs.pcap"
def test_relative_time_1(self):

View File

@ -2,15 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import os
import dfiltertest
from dftestlib import dftest
# Force the timezone to UTC so the checks below work regardless of what time
# zone we're in.
os.environ['TZ'] = "UTC"
class testTime(dftest.DFTest):
class case_time(dfiltertest.DFTestCase):
trace_file = "http.pcap"
def test_eq_1(self):

View File

@ -2,11 +2,10 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
import unittest
from dftestlib import dftest
class testTVB(dftest.DFTest):
class case_tvb(dfiltertest.DFTestCase):
trace_file = "http.pcap"
def test_eq_1(self):

View File

@ -2,10 +2,9 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
import dfiltertest
from dftestlib import dftest
class testUINT64(dftest.DFTest):
class case_uint64(dfiltertest.DFTestCase):
trace_file = "nfs.pcap"
def test_uint64_1(self):

View File

@ -63,7 +63,7 @@ def main():
if args.capture_interface:
config.setCaptureInterface(args.capture_interface[0])
all_tests = unittest.defaultTestLoader.discover(os.path.dirname(__file__), pattern='suite_*.py')
all_tests = unittest.defaultTestLoader.discover(os.path.dirname(__file__), pattern='suite_*')
all_ids = []
find_test_ids(all_tests, all_ids)

View File

@ -1,35 +0,0 @@
#!/usr/bin/env python
"""
Test-suite to test wireshark's dfilter mechanism.
"""
#
# Copyright (C) 2003-2013 by Gilbert Ramirez <gram@alumni.rice.edu>
#
# SPDX-License-Identifier: GPL-2.0-or-later
import os
import sys
import types
import unittest
# Import each test class so unittest.main() can find them
from dftestlib.bytes_type import testBytes
from dftestlib.bytes_ether import testBytesEther
from dftestlib.bytes_ipv6 import testBytesIPv6
from dftestlib.double import testDouble
from dftestlib.integer import testInteger
from dftestlib.integer_1byte import testInteger1Byte
from dftestlib.ipv4 import testIPv4
from dftestlib.membership import testMembership
from dftestlib.range_method import testRange
from dftestlib.scanner import testScanner
from dftestlib.string_type import testString
from dftestlib.stringz import testStringz
from dftestlib.time_type import testTime
from dftestlib.time_relative import testTimeRelative
from dftestlib.tvb import testTVB
from dftestlib.uint64 import testUINT64
if __name__ == "__main__":
unittest.main(verbosity=2)

View File

@ -1,4 +0,0 @@
#
# Copyright (C) 2013 by Gilbert Ramirez <gram@alumni.rice.edu>
#
# SPDX-License-Identifier: GPL-2.0-or-later

View File

@ -1,79 +0,0 @@
# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu>
#
# SPDX-License-Identifier: GPL-2.0-or-later
import os
import tempfile
import unittest
from dftestlib import util
# The binaries to use. We assume we are running
# from the top of the wireshark distro
TSHARK = os.path.join(os.getenv("WS_BIN_PATH", "."), "tshark")
class DFTest(unittest.TestCase):
"""Base class for all tests in this dfilter-test collection."""
# Remove these file when finished (in tearDownClass)
files_to_remove = []
@classmethod
def setUpClass(cls):
"""Create the trace file to be used in the tests."""
assert cls.trace_file
# if the class sets the 'trace_file' field, then it
# names the trace file to use for the tests. It *should*
# reside in dftestfiles
assert not os.path.isabs(cls.trace_file)
toolsdir = os.path.dirname(os.path.dirname(__file__))
cls.trace_file = os.path.join(toolsdir, "dftestfiles", cls.trace_file)
@classmethod
def tearDownClass(cls):
"""Remove the trace file used in the tests."""
for filename in cls.files_to_remove:
if os.path.exists(filename):
try:
os.remove(filename)
except OSError:
pass
def runDFilter(self, dfilter):
# Create the tshark command
cmdv = [TSHARK,
"-n", # No name resolution
"-r", # Next arg is trace file to read
self.trace_file,
"-Y", # packet display filter (used to be -R)
dfilter]
(status, output) = util.exec_cmdv(cmdv)
return status, output
def assertDFilterCount(self, dfilter, expected_count):
"""Run a display filter and expect a certain number of packets."""
(status, output) = self.runDFilter(dfilter)
# tshark must succeed
self.assertEqual(status, util.SUCCESS, output)
# Split the output (one big string) into lines, removing
# empty lines (extra newline at end of output)
lines = [L for L in output.split("\n") if L != ""]
msg = "Expected %d, got: %s" % (expected_count, output)
self.assertEqual(len(lines), expected_count, msg)
def assertDFilterFail(self, dfilter):
"""Run a display filter and expect tshark to fail"""
(status, output) = self.runDFilter(dfilter)
# tshark must succeed
self.assertNotEqual(status, util.SUCCESS, output)

View File

@ -1,34 +0,0 @@
# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu>
#
# SPDX-License-Identifier: GPL-2.0-or-later
import subprocess, sys
SUCCESS = 0
def exec_cmdv(cmdv, cwd=None, stdin=None):
"""Run the commands in cmdv, returning (retval, output),
where output is stdout and stderr combined.
If cwd is given, the child process runs in that directory.
If a filehandle is passed as stdin, it is used as stdin.
If there is an OS-level error, None is the retval."""
try:
output = subprocess.check_output(cmdv, stderr=subprocess.STDOUT,
cwd=cwd, stdin=stdin)
retval = SUCCESS
# If file isn't executable
except OSError as e:
return (None, str(e))
# If process returns non-zero
except subprocess.CalledProcessError as e:
output = e.output
retval = e.returncode
if sys.version_info[0] >= 3:
output = output.decode('utf-8')
return (retval, output)