wireshark/test/subprocesstest.py

323 lines
13 KiB
Python

#
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
'''Subprocess test case superclass'''
import difflib
import io
import os
import os.path
import re
import subprocess
import sys
import unittest
# To do:
# - Add a subprocesstest.SkipUnlessCapture decorator?
# - Try to catch crashes? See the comments below in waitProcess.
process_timeout = 300 # Seconds
def cat_dhcp_command(mode):
'''Create a command string for dumping dhcp.pcap to stdout'''
# XXX Do this in Python in a thread?
sd_cmd = ''
if sys.executable:
sd_cmd = '"{}" '.format(sys.executable)
this_dir = os.path.dirname(__file__)
sd_cmd += os.path.join(this_dir, 'util_dump_dhcp_pcap.py ' + mode)
return sd_cmd
def cat_cap_file_command(cap_files):
'''Create a command string for dumping one or more capture files to stdout'''
# XXX Do this in Python in a thread?
if isinstance(cap_files, str):
cap_files = [ cap_files ]
quoted_paths = ' '.join('"{}"'.format(cap_file) for cap_file in cap_files)
if sys.platform.startswith('win32'):
# https://docs.microsoft.com/en-us/previous-versions/windows/it-pro/windows-xp/bb491026(v=technet.10)
# says that the `type` command "displays the contents of a text
# file." Copy to the console instead.
return 'copy {} CON'.format(quoted_paths)
return 'cat {}'.format(quoted_paths)
class LoggingPopen(subprocess.Popen):
'''Run a process using subprocess.Popen. Capture and log its output.
Stdout and stderr are captured to memory and decoded as UTF-8. On
Windows, CRLF line endings are normalized to LF. The program command
and output is written to log_fd.
'''
def __init__(self, proc_args, *args, **kwargs):
self.log_fd = kwargs.pop('log_fd', None)
self.max_lines = kwargs.pop('max_lines', None)
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
# Make sure communicate() gives us bytes.
kwargs['universal_newlines'] = False
self.cmd_str = 'command ' + repr(proc_args)
super().__init__(proc_args, *args, **kwargs)
self.stdout_str = ''
self.stderr_str = ''
@staticmethod
def trim_output(out_log, max_lines):
lines = out_log.splitlines(True)
if not len(lines) > max_lines * 2 + 1:
return out_log
header = lines[:max_lines]
body = lines[max_lines:-max_lines]
body = "<<< trimmed {} lines of output >>>\n".format(len(body))
footer = lines[-max_lines:]
return ''.join(header) + body + ''.join(footer)
def wait_and_log(self):
'''Wait for the process to finish and log its output.'''
out_data, err_data = self.communicate(timeout=process_timeout)
out_log = out_data.decode('UTF-8', 'replace')
if self.max_lines and self.max_lines > 0:
out_log = self.trim_output(out_log, self.max_lines)
err_log = err_data.decode('UTF-8', 'replace')
self.log_fd.flush()
self.log_fd.write('-- Begin stdout for {} --\n'.format(self.cmd_str))
self.log_fd.write(out_log)
self.log_fd.write('-- End stdout for {} --\n'.format(self.cmd_str))
self.log_fd.write('-- Begin stderr for {} --\n'.format(self.cmd_str))
self.log_fd.write(err_log)
self.log_fd.write('-- End stderr for {} --\n'.format(self.cmd_str))
self.log_fd.flush()
# Make sure our output is the same everywhere.
# Throwing a UnicodeDecodeError exception here is arguably a good thing.
self.stdout_str = out_data.decode('UTF-8', 'strict').replace('\r\n', '\n')
self.stderr_str = err_data.decode('UTF-8', 'strict').replace('\r\n', '\n')
def stop_process(self, kill=False):
'''Stop the process immediately.'''
if kill:
super().kill()
else:
super().terminate()
def terminate(self):
'''Terminate the process. Do not log its output.'''
# XXX Currently unused.
self.stop_process(kill=False)
def kill(self):
'''Kill the process. Do not log its output.'''
self.stop_process(kill=True)
class SubprocessTestCase(unittest.TestCase):
'''Run a program and gather its stdout and stderr.'''
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.exit_ok = 0
# See ws_exit_codes.h
self.exit_command_line = 1
self.exit_invalid_interface = 2
self.exit_invalid_file_error = 3
self.exit_invalid_filter_error = 4
self.exit_invalid_capability = 5
self.exit_iface_no_link_types = 6
self.exit_iface_has_no_timestamp_types = 7
self.exit_init_failed = 8
self.exit_open_error = 9
self.exit_code = None
self.log_fname = None
self.log_fd = None
self.processes = []
self.cleanup_files = []
self.dump_files = []
def log_fd_write_bytes(self, log_data):
self.log_fd.write(log_data)
def filename_from_id(self, filename):
'''Generate a filename prefixed with our test ID.'''
id_filename = self.id() + '.' + filename
if id_filename not in self.cleanup_files:
self.cleanup_files.append(id_filename)
return id_filename
def kill_processes(self):
'''Kill any processes we've opened so far'''
for proc in self.processes:
try:
proc.kill()
except Exception:
pass
def setUp(self):
"""
Set up a single test. Opens a log file and add it to the cleanup list.
"""
self.processes = []
self.log_fname = self.filename_from_id('log')
# Our command line utilities generate UTF-8. The log file endcoding
# needs to match that.
# XXX newline='\n' works for now, but we might have to do more work
# to handle line endings in the future.
self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8', newline='\n')
self.cleanup_files.append(self.log_fname)
def _last_test_failed(self):
"""Check for non-skipped tests that resulted in errors."""
# The test outcome is not available via the public unittest API, so
# check a private property, "_outcome", set by unittest.TestCase.run.
# It remains None when running in debug mode (`pytest --pdb`).
# The property is available since Python 3.4 until at least Python 3.7.
if self._outcome:
if hasattr(self._outcome, 'errors'):
# Python 3.4 - 3.10
result = self.defaultTestResult()
self._feedErrorsToResult(result, self._outcome.errors)
else:
# Python 3.11+
result = self._outcome.result
for test_case, exc_info in (result.errors + result.failures):
if exc_info:
return True
# No errors occurred or running in debug mode.
return False
def tearDown(self):
"""
Tears down a single test. Kills stray processes and closes the log file.
On errors, display the log contents. On success, remove temporary files.
"""
self.kill_processes()
self.log_fd.close()
if self._last_test_failed():
self.dump_files.append(self.log_fname)
# Leave some evidence behind.
self.cleanup_files = []
print('\nProcess output for {}:'.format(self.id()))
with io.open(self.log_fname, 'r', encoding='UTF-8', errors='backslashreplace') as log_fd:
for line in log_fd:
sys.stdout.write(line)
for filename in self.cleanup_files:
try:
os.unlink(filename)
except OSError:
pass
self.cleanup_files = []
def getCaptureInfo(self, capinfos_args=None, cap_file=None):
'''Run capinfos on a capture file and log its output.
capinfos_args must be a sequence.
Default cap_file is <test id>.testout.pcap.'''
# XXX convert users to use a new fixture instead of this function.
cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos')
if not cap_file:
cap_file = self.filename_from_id('testout.pcap')
self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file))
capinfos_cmd = [cmd_capinfos]
if capinfos_args is not None:
capinfos_cmd += capinfos_args
capinfos_cmd.append(cap_file)
capinfos_data = subprocess.check_output(capinfos_cmd)
capinfos_stdout = capinfos_data.decode('UTF-8', 'replace')
self.log_fd.write(capinfos_stdout)
return capinfos_stdout
def checkPacketCount(self, num_packets, cap_file=None):
'''Make sure a capture file contains a specific number of packets.'''
got_num_packets = False
capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
count_pat = r'Number of packets:\s+{}'.format(num_packets)
if re.search(count_pat, capinfos_testout):
got_num_packets = True
self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
def countOutput(self, search_pat=None, count_stdout=True, count_stderr=False, proc=None):
'''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
match_count = 0
self.assertTrue(count_stdout or count_stderr, 'No output to count.')
if proc is None:
proc = self.processes[-1]
out_data = ''
if count_stdout:
out_data = proc.stdout_str
if count_stderr:
out_data += proc.stderr_str
if search_pat is None:
return len(out_data.splitlines())
search_re = re.compile(search_pat)
for line in out_data.splitlines():
if search_re.search(line):
match_count += 1
return match_count
def grepOutput(self, search_pat, proc=None):
return self.countOutput(search_pat, count_stderr=True, proc=proc) > 0
def diffOutput(self, blob_a, blob_b, *args, **kwargs):
'''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
blob_a and blob_b must be UTF-8 strings.'''
lines_a = blob_a.splitlines()
lines_b = blob_b.splitlines()
diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
if len(diff) > 0:
self.log_fd.flush()
self.log_fd.write('-- Begin diff output --\n')
self.log_fd.writelines(diff)
self.log_fd.write('-- End diff output --\n')
return False
return True
def startProcess(self, proc_args, stdin=None, env=None, shell=False, cwd=None, max_lines=None):
'''Start a process in the background. Returns a subprocess.Popen object.
You typically wait for it using waitProcess() or assertWaitProcess().'''
if env is None:
# Apply default test environment if no override is provided.
env = getattr(self, 'injected_test_env', None)
# Not all tests need test_env, but those that use runProcess or
# startProcess must either pass an explicit environment or load the
# fixture (via a test method parameter or class decorator).
assert not (env is None and hasattr(self, '_fixture_request')), \
"Decorate class with @fixtures.mark_usefixtures('test_env')"
proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd, cwd=cwd, max_lines=max_lines)
self.processes.append(proc)
return proc
def waitProcess(self, process):
'''Wait for a process to finish.'''
process.wait_and_log()
# XXX The shell version ran processes using a script called run_and_catch_crashes
# which looked for core dumps and printed stack traces if found. We might want
# to do something similar here. This may not be easy on modern Ubuntu systems,
# which default to using Apport: https://wiki.ubuntu.com/Apport
def assertWaitProcess(self, process, expected_return=0):
'''Wait for a process to finish and check its exit code.'''
process.wait_and_log()
self.assertEqual(process.returncode, expected_return)
def runProcess(self, args, env=None, shell=False, cwd=None, max_lines=None):
'''Start a process and wait for it to finish.'''
process = self.startProcess(args, env=env, shell=shell, cwd=cwd, max_lines=max_lines)
process.wait_and_log()
return process
def assertRun(self, args, env=None, shell=False, expected_return=0, cwd=None, max_lines=None):
'''Start a process and wait for it to finish. Check its return code.'''
process = self.runProcess(args, env=env, shell=shell, cwd=cwd, max_lines=max_lines)
self.assertEqual(process.returncode, expected_return)
return process