diff --git a/test/subprocesstest.py b/test/subprocesstest.py index 84135c8240..08aa9b5e37 100644 --- a/test/subprocesstest.py +++ b/test/subprocesstest.py @@ -33,6 +33,25 @@ class ExitCodes(enum.IntEnum): INIT_FAILED = 8 OPEN_ERROR = 9 +def run(*args, capture_output=False, stdout=None, stderr=None, encoding='utf-8', **kwargs): + ''' Wrapper for subprocess.run() that captures and decodes output.''' + + # If the user told us what to do with standard streams use that. + if capture_output or stdout or stderr: + return subprocess.run(*args, capture_output=capture_output, stdout=stdout, stderr=stderr, encoding=encoding, **kwargs) + + # If the user doesn't want to capture output try to ensure the child inherits the parents stdout and stderr on + # all platforms, so pytest can reliably capture it and do the right thing. Otherwise the child output may be interleaved + # on the console with the parent and mangle pytest's status output. + # + # Make the child inherit only the parents stdout and stderr. + return subprocess.run(*args, close_fds=True, stdout=sys.stdout, stderr=sys.stderr, encoding=encoding, **kwargs) + +def check_run(*args, **kwargs): + ''' Same as run(), also check child process returns 0 (success)''' + proc = run(*args, check=True, **kwargs) + return proc + def cat_dhcp_command(mode): '''Create a command string for dumping dhcp.pcap to stdout''' # XXX Do this in Python in a thread? diff --git a/test/suite_capture.py b/test/suite_capture.py index 4c3e78253d..88069bd0b9 100644 --- a/test/suite_capture.py +++ b/test/suite_capture.py @@ -13,6 +13,7 @@ import hashlib import os import socket import subprocess +import subprocesstest from subprocesstest import cat_dhcp_command, cat_cap_file_command, count_output, grep_output, check_packet_count import sys import threading @@ -89,7 +90,7 @@ def check_capture_10_packets(capture_interface, cmd_capinfos, traffic_generator, testout_file = result_file(testout_pcap) stop_traffic = start_traffic() if to_stdout: - capture_proc = subprocess.run(capture_command(cmd, + subprocesstest.check_run(capture_command(cmd, '-i', '"{}"'.format(capture_interface), '-p', '-w', '-', @@ -99,10 +100,9 @@ def check_capture_10_packets(capture_interface, cmd_capinfos, traffic_generator, '>', testout_file, shell=True ), - shell=True, env=env - ) + shell=True, env=env) else: - capture_proc = subprocess.run(capture_command(cmd, + subprocesstest.check_run(capture_command(cmd, '-i', capture_interface, '-p', '-w', testout_file, @@ -111,8 +111,6 @@ def check_capture_10_packets(capture_interface, cmd_capinfos, traffic_generator, '-f', cfilter, ), env=env) stop_traffic() - capture_returncode = capture_proc.returncode - assert capture_returncode == 0 check_packet_count(cmd_capinfos, 10, testout_file) return check_capture_10_packets_real @@ -136,13 +134,12 @@ def check_capture_fifo(cmd_capinfos, result_file): fifo_proc = subprocess.Popen( ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)), shell=True) - capture_proc = subprocess.run(capture_command(cmd, + subprocesstest.check_run(capture_command(cmd, '-i', fifo_file, '-p', '-w', testout_file, '-a', 'duration:{}'.format(capture_duration), ), env=env) - assert capture_proc.returncode == 0 fifo_proc.kill() assert os.path.isfile(testout_file) check_packet_count(cmd_capinfos, 8, testout_file) @@ -168,12 +165,13 @@ def check_capture_stdin(cmd_capinfos, result_file): capture_cmd += ' --log-level=info' if sysconfig.get_platform().startswith('mingw'): pytest.skip('FIXME Pipes are broken with the MSYS2 shell') - pipe_proc = subprocess.run(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True, env=env) - assert pipe_proc.returncode == 0 + pipe_proc = subprocesstest.check_run(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True, capture_output=True, env=env) if is_gui: - assert grep_output('Wireshark is up and ready to go'), 'No startup message.' - assert grep_output('Capture started'), 'No capture start message.' - assert grep_output('Capture stopped'), 'No capture stop message.' + # Wireshark uses stdout and not stderr for diagnostic messages + # XXX: Confirm this + assert grep_output(pipe_proc.stdout, 'Wireshark is up and ready to go'), 'No startup message.' + assert grep_output(pipe_proc.stdout, 'Capture started'), 'No capture start message.' + assert grep_output(pipe_proc.stdout, 'Capture stopped'), 'No capture stop message.' assert os.path.isfile(testout_file) check_packet_count(cmd_capinfos, 8, testout_file) return check_capture_stdin_real @@ -182,11 +180,11 @@ def check_capture_stdin(cmd_capinfos, result_file): @pytest.fixture def check_capture_read_filter(capture_interface, traffic_generator, cmd_capinfos, result_file): start_traffic, cfilter = traffic_generator - def check_capture_read_filter_real(self, cmd=None): + def check_capture_read_filter_real(self, cmd=None, env=None): assert cmd is not None testout_file = result_file(testout_pcap) stop_traffic = start_traffic() - capture_proc = subprocess.run(capture_command(cmd, + subprocesstest.check_run(capture_command(cmd, '-i', capture_interface, '-p', '-w', testout_file, @@ -196,7 +194,6 @@ def check_capture_read_filter(capture_interface, traffic_generator, cmd_capinfos '-a', 'duration:{}'.format(capture_duration), '-f', cfilter, ), env=env) - assert capture_proc.returncode == 0 stop_traffic() check_packet_count(cmd_capinfos, 0, testout_file) return check_capture_read_filter_real @@ -208,7 +205,7 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator, assert cmd is not None stop_traffic = start_traffic() testout_file = result_file(testout_pcap) - capture_proc = subprocess.run(capture_command(cmd, + subprocesstest.check_run(capture_command(cmd, '-i', capture_interface, '-p', '-w', testout_file, @@ -216,19 +213,17 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator, '-a', 'duration:{}'.format(capture_duration), '-f', cfilter, ), env=env) - assert capture_proc.returncode == 0 stop_traffic() assert os.path.isfile(testout_file) # Use tshark to filter out all packets larger than 68 bytes. testout2_file = result_file('testout2.pcap') - filter_proc = subprocess.run((cmd_tshark, + subprocesstest.check_run((cmd_tshark, '-r', testout_file, '-w', testout2_file, '-Y', 'frame.cap_len>{}'.format(snapshot_len), ), env=env) - assert filter_proc.returncode == 0 check_packet_count(cmd_capinfos, 0, testout2_file) return check_capture_snapshot_len_real @@ -256,8 +251,7 @@ def check_dumpcap_autostop_stdin(cmd_dumpcap, cmd_capinfos, result_file): )) if sysconfig.get_platform().startswith('mingw'): pytest.skip('FIXME Pipes are broken with the MSYS2 shell') - pipe_proc = subprocess.run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True, env=env) - pipe_proc.returncode == 0 + subprocesstest.check_run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True, env=env) assert os.path.isfile(testout_file) if packets is not None: @@ -294,8 +288,7 @@ def check_dumpcap_ringbuffer_stdin(cmd_dumpcap, cmd_capinfos, result_file): )) if sysconfig.get_platform().startswith('mingw'): pytest.skip('FIXME Pipes are broken with the MSYS2 shell') - pipe_proc = subprocess.run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True, env=env) - pipe_proc.returncode == 0 + subprocesstest.check_run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True, env=env) rb_files = glob.glob(testout_glob) assert len(rb_files) == 2 @@ -411,8 +404,7 @@ def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, cmd_capinfos, capture capture_cmd = capture_command(cmd_dumpcap, *capture_cmd_args) - capture_proc = subprocess.run(capture_cmd, env=env) - capture_proc.returncode == 0 + subprocesstest.check_run(capture_cmd, env=env) for fifo_proc in fifo_procs: fifo_proc.kill() rb_files = [] @@ -452,11 +444,11 @@ def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, cmd_capinfos, capture for check_val in check_vals: check_packet_count(cmd_capinfos, check_val['packet_count'], check_val['filename']) - tshark_proc = subprocess.run(capture_command(cmd_tshark, + tshark_proc = subprocesstest.check_run(capture_command(cmd_tshark, '-r', check_val['filename'], '-V', '-X', 'read_format:MIME Files Format' - ), capture_output=True, encoding='utf-8', env=env) + ), capture_output=True, env=env) # XXX Are there any other sanity checks we should run? if idb_compare_eq: assert count_output(tshark_proc.stdout, r'Block: Interface Description Block') \ diff --git a/test/suite_clopts.py b/test/suite_clopts.py index 8bd998fac7..44f887ff3b 100644 --- a/test/suite_clopts.py +++ b/test/suite_clopts.py @@ -12,6 +12,7 @@ import json import sys import os.path import subprocess +import subprocesstest from subprocesstest import ExitCodes, grep_output, count_output import shutil import pytest @@ -27,21 +28,21 @@ class TestDumpcapOptions: def test_dumpcap_invalid_chars(self, cmd_dumpcap, base_env): '''Invalid dumpcap parameters''' for char_arg in 'CEFGHJKNOQRTUVWXYejloxz': - process = subprocess.run((cmd_dumpcap, '-' + char_arg), env=base_env) + process = subprocesstest.run((cmd_dumpcap, '-' + char_arg), env=base_env) assert process.returncode == ExitCodes.COMMAND_LINE # XXX Should we generate individual test functions instead of looping? def test_dumpcap_valid_chars(self, cmd_dumpcap, base_env): for char_arg in 'hv': - process = subprocess.run((cmd_dumpcap, '-' + char_arg), env=base_env) - assert process.returncode == 0 + process = subprocesstest.run((cmd_dumpcap, '-' + char_arg), env=base_env) + assert process.returncode == ExitCodes.OK # XXX Should we generate individual test functions instead of looping? def test_dumpcap_interface_chars(self, cmd_dumpcap, base_env): '''Valid dumpcap parameters requiring capture permissions''' valid_returns = [ExitCodes.OK, ExitCodes.INVALID_INTERFACE] for char_arg in 'DL': - process = subprocess.run((cmd_dumpcap, '-' + char_arg), env=base_env) + process = subprocesstest.run((cmd_dumpcap, '-' + char_arg), env=base_env) assert process.returncode in valid_returns @@ -51,7 +52,7 @@ class TestDumpcapClopts: invalid_filter = '__invalid_protocol' # $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1 testout_file = result_file(testout_pcap) - process = subprocess.run((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file), capture_output=True, encoding='utf-8', env=base_env) + process = subprocesstest.run((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file), capture_output=True, env=base_env) assert grep_output(process.stderr, 'Invalid capture filter "' + invalid_filter + '" for interface') def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, capture_interface, result_file, base_env): @@ -59,7 +60,7 @@ class TestDumpcapClopts: invalid_interface = '__invalid_interface' # $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1 testout_file = result_file(testout_pcap) - process = subprocess.run((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file), capture_output=True, encoding='utf-8', env=base_env) + process = subprocesstest.run((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file), capture_output=True, env=base_env) assert grep_output(process.stderr, 'There is no device named "__invalid_interface"') or \ grep_output(process.stderr, 'The capture session could not be initiated on capture device "__invalid_interface"') @@ -68,19 +69,19 @@ class TestDumpcapClopts: invalid_index = '0' # $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1 testout_file = result_file(testout_pcap) - process = subprocess.run((cmd_dumpcap, '-i', invalid_index, '-w', testout_file), capture_output=True, encoding='utf-8', env=base_env) + process = subprocesstest.run((cmd_dumpcap, '-i', invalid_index, '-w', testout_file), capture_output=True, env=base_env) assert grep_output(process.stderr, 'There is no interface with that adapter index') class TestBasicClopts: def test_existing_file(self, cmd_tshark, capture_file, test_env): # $TSHARK -r "${CAPTURE_DIR}dhcp.pcap" > ./testout.txt 2>&1 - process = subprocess.run((cmd_tshark, '-r', capture_file('dhcp.pcap')), env=test_env) - assert process.returncode == 0 + process = subprocesstest.run((cmd_tshark, '-r', capture_file('dhcp.pcap')), env=test_env) + assert process.returncode == ExitCodes.OK def test_nonexistent_file(self, cmd_tshark, capture_file, test_env): # $TSHARK - r ThisFileDontExist.pcap > ./testout.txt 2 > &1 - process = subprocess.run((cmd_tshark, '-r', capture_file('__ceci_nest_pas_une.pcap')), env=test_env) + process = subprocesstest.run((cmd_tshark, '-r', capture_file('__ceci_nest_pas_une.pcap')), env=test_env) assert process.returncode == ExitCodes.INVALID_FILE_ERROR @@ -89,14 +90,14 @@ class TestTsharkOptions: def test_tshark_invalid_chars(self, cmd_tshark, test_env): '''Invalid tshark parameters''' for char_arg in 'ABCEFHJKMNORTUWXYZabcdefijkmorstuwyz': - process = subprocess.run((cmd_tshark, '-' + char_arg), env=test_env) + process = subprocesstest.run((cmd_tshark, '-' + char_arg), env=test_env) assert process.returncode == ExitCodes.COMMAND_LINE # XXX Should we generate individual test functions instead of looping? def test_tshark_valid_chars(self, cmd_tshark, test_env): for char_arg in 'Ghv': - process = subprocess.run((cmd_tshark, '-' + char_arg), env=test_env) - process.returncode == 0 + process = subprocesstest.run((cmd_tshark, '-' + char_arg), env=test_env) + process.returncode == ExitCodes.OK # XXX Should we generate individual test functions instead of looping? def test_tshark_interface_chars(self, cmd_tshark, cmd_dumpcap, test_env): @@ -104,7 +105,7 @@ class TestTsharkOptions: # These options require dumpcap valid_returns = [ExitCodes.OK, ExitCodes.INVALID_CAPABILITY] for char_arg in 'DL': - process = subprocess.run((cmd_tshark, '-' + char_arg), env=test_env) + process = subprocesstest.run((cmd_tshark, '-' + char_arg), env=test_env) assert process.returncode in valid_returns @@ -114,7 +115,7 @@ class TestTsharkCaptureClopts: invalid_filter = '__invalid_protocol' # $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1 testout_file = result_file(testout_pcap) - process = subprocess.run((cmd_tshark, '-f', invalid_filter, '-w', testout_file ), capture_output=True, encoding='utf-8', env=test_env) + process = subprocesstest.run((cmd_tshark, '-f', invalid_filter, '-w', testout_file ), capture_output=True, env=test_env) assert grep_output(process.stderr, 'Invalid capture filter "' + invalid_filter + '" for interface') def test_tshark_invalid_interface_name(self, cmd_tshark, capture_interface, result_file, test_env): @@ -122,7 +123,7 @@ class TestTsharkCaptureClopts: invalid_interface = '__invalid_interface' # $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1 testout_file = result_file(testout_pcap) - process = subprocess.run((cmd_tshark, '-i', invalid_interface, '-w', testout_file), capture_output=True, encoding='utf-8', env=test_env) + process = subprocesstest.run((cmd_tshark, '-i', invalid_interface, '-w', testout_file), capture_output=True, env=test_env) assert grep_output(process.stderr, 'There is no device named "__invalid_interface"') or \ grep_output(process.stderr, 'The capture session could not be initiated on capture device "__invalid_interface"') @@ -131,14 +132,14 @@ class TestTsharkCaptureClopts: invalid_index = '0' # $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1 testout_file = result_file(testout_pcap) - process = subprocess.run((cmd_tshark, '-i', invalid_index, '-w', testout_file), capture_output=True, encoding='utf-8', env=test_env) + process = subprocesstest.run((cmd_tshark, '-i', invalid_index, '-w', testout_file), capture_output=True, env=test_env) assert grep_output(process.stderr, 'There is no interface with that adapter index') class TestTsharkNameResolutionClopts: def test_tshark_valid_name_resolution(self, cmd_tshark, capture_file, test_env): # $TSHARK -N mnNtdv -a duration:1 > ./testout.txt 2>&1 - process = subprocess.run((cmd_tshark, + process = subprocesstest.run((cmd_tshark, '-r', capture_file('empty.pcap'), '-N', 'mnNtdv', ), env=test_env) @@ -149,32 +150,29 @@ class TestTsharkNameResolutionClopts: class TestTsharkUnicodeClopts: def test_tshark_unicode_display_filter(self, cmd_tshark, capture_file, test_env): '''Unicode (UTF-8) display filter''' - process = subprocess.run((cmd_tshark, '-r', capture_file('http.pcap'), '-Y', 'tcp.flags.str == "·······AP···"'), capture_output=True, encoding='utf-8', env=test_env) + process = subprocesstest.run((cmd_tshark, '-r', capture_file('http.pcap'), '-Y', 'tcp.flags.str == "·······AP···"'), capture_output=True, env=test_env) assert grep_output(process.stdout, 'HEAD.*/v4/iuident.cab') class TestTsharkDumpGlossaries: def test_tshark_dump_glossary(self, cmd_tshark, base_env): for glossary in glossaries: - process = subprocess.run((cmd_tshark, '-G', glossary), capture_output=True, encoding='utf-8', env=base_env) - assert count_output(process.stderr, 'Found error output while printing glossary ' + glossary) == 0 + process = subprocesstest.run((cmd_tshark, '-G', glossary), capture_output=True, env=base_env) + assert not process.stderr, 'Found error output while printing glossary ' + glossary def test_tshark_glossary_valid_utf8(self, cmd_tshark, base_env): for glossary in glossaries: env = base_env env['LANG'] = 'en_US.UTF-8' - g_contents = subprocess.check_output((cmd_tshark, '-G', glossary), env=env, stderr=subprocess.PIPE) - decoded = True - try: - g_contents.decode('UTF-8') - except UnicodeDecodeError: - decoded = False - assert decoded, '{} is not valid UTF-8'.format(glossary) + # subprocess.run() returns bytes here. + proc = subprocess.run((cmd_tshark, '-G', glossary), capture_output=True, env=env) + assert proc.returncode == 0 + proc.stdout.decode('UTF-8') def test_tshark_glossary_plugin_count(self, cmd_tshark, base_env, features): if not features.have_plugins: pytest.skip('Test requires binary plugin support.') - process = subprocess.run((cmd_tshark, '-G', 'plugins'), capture_output=True, encoding='utf-8', env=base_env) + process = subprocesstest.run((cmd_tshark, '-G', 'plugins'), capture_output=True, env=base_env) assert count_output(process.stdout, 'dissector') >= 10, 'Fewer than 10 dissector plugins found' def test_tshark_elastic_mapping(self, cmd_tshark, dirs, base_env): @@ -184,7 +182,7 @@ class TestTsharkDumpGlossaries: with open(baseline_file) as f: expected_obj = json.load(f) keys_to_check = get_ip_props(expected_obj).keys() - proc = subprocess.run((cmd_tshark, '-G', 'elastic-mapping', '--elastic-mapping-filter', 'ip'), capture_output=True, encoding='utf-8', env=base_env) + proc = subprocesstest.run((cmd_tshark, '-G', 'elastic-mapping', '--elastic-mapping-filter', 'ip'), capture_output=True, env=base_env) actual_obj = json.loads(proc.stdout) ip_props = get_ip_props(actual_obj) for key in list(ip_props.keys()): @@ -198,7 +196,7 @@ class TestTsharkDumpGlossaries: pytest.skip('Test requires Lua scripting support.') if sys.platform == 'win32' and not features.have_lua_unicode: pytest.skip('Test requires a patched Lua build with UTF-8 support.') - proc = subprocess.run((cmd_tshark, '-G', 'folders'), capture_output=True, encoding='utf-8', env=unicode_env.env) + proc = subprocesstest.run((cmd_tshark, '-G', 'folders'), capture_output=True, env=unicode_env.env) out = proc.stdout pluginsdir = [x.split('\t', 1)[1] for x in out.splitlines() if x.startswith('Personal Lua Plugins:')] assert [unicode_env.pluginsdir] == pluginsdir @@ -206,70 +204,70 @@ class TestTsharkDumpGlossaries: class TestTsharkZExpert: def test_tshark_z_expert_all(self, cmd_tshark, capture_file, test_env): - proc = subprocess.run((cmd_tshark, '-q', '-z', 'expert', - '-r', capture_file('http-ooo.pcap')), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-q', '-z', 'expert', + '-r', capture_file('http-ooo.pcap')), capture_output=True, env=test_env) assert grep_output(proc.stdout, 'Errors') assert grep_output(proc.stdout, 'Warns') assert grep_output(proc.stdout, 'Chats') def test_tshark_z_expert_error(self, cmd_tshark, capture_file, test_env): - proc = subprocess.run((cmd_tshark, '-q', '-z', 'expert,error', - '-r', capture_file('http-ooo.pcap')), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-q', '-z', 'expert,error', + '-r', capture_file('http-ooo.pcap')), capture_output=True, env=test_env) assert grep_output(proc.stdout, 'Errors') assert not grep_output(proc.stdout, 'Warns') assert not grep_output(proc.stdout, 'Chats') def test_tshark_z_expert_warn(self, cmd_tshark, capture_file, test_env): - proc = subprocess.run((cmd_tshark, '-q', '-z', 'expert,warn', - '-r', capture_file('http-ooo.pcap')), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-q', '-z', 'expert,warn', + '-r', capture_file('http-ooo.pcap')), capture_output=True, env=test_env) assert grep_output(proc.stdout, 'Errors') assert grep_output(proc.stdout, 'Warns') assert not grep_output(proc.stdout, 'Chats') def test_tshark_z_expert_note(self, cmd_tshark, capture_file, test_env): - proc = subprocess.run((cmd_tshark, '-q', '-z', 'expert,note', - '-r', capture_file('http2-data-reassembly.pcap')), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-q', '-z', 'expert,note', + '-r', capture_file('http2-data-reassembly.pcap')), capture_output=True, env=test_env) assert grep_output(proc.stdout, 'Warns') assert grep_output(proc.stdout, 'Notes') assert not grep_output(proc.stdout, 'Chats') def test_tshark_z_expert_chat(self, cmd_tshark, capture_file, test_env): - proc = subprocess.run((cmd_tshark, '-q', '-z', 'expert,chat', - '-r', capture_file('http-ooo.pcap')), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-q', '-z', 'expert,chat', + '-r', capture_file('http-ooo.pcap')), capture_output=True, env=test_env) assert grep_output(proc.stdout, 'Errors') assert grep_output(proc.stdout, 'Warns') assert grep_output(proc.stdout, 'Chats') def test_tshark_z_expert_comment(self, cmd_tshark, capture_file, test_env): - proc = subprocess.run((cmd_tshark, '-q', '-z', 'expert,comment', - '-r', capture_file('sip.pcapng')), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-q', '-z', 'expert,comment', + '-r', capture_file('sip.pcapng')), capture_output=True, env=test_env) assert grep_output(proc.stdout, 'Notes') assert grep_output(proc.stdout, 'Comments') def test_tshark_z_expert_invalid_filter(self, cmd_tshark, capture_file, test_env): invalid_filter = '__invalid_protocol' - proc = subprocess.run((cmd_tshark, '-q', '-z', 'expert,' + invalid_filter, - '-r', capture_file('http-ooo.pcap')), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-q', '-z', 'expert,' + invalid_filter, + '-r', capture_file('http-ooo.pcap')), capture_output=True, env=test_env) assert proc.returncode == ExitCodes.COMMAND_LINE assert grep_output(proc.stdout, 'Filter "' + invalid_filter + '" is invalid') def test_tshark_z_expert_error_invalid_filter(self, cmd_tshark, capture_file, test_env): invalid_filter = '__invalid_protocol' - proc = subprocess.run((cmd_tshark, '-q', '-z', 'expert,error,' + invalid_filter, - '-r', capture_file('http-ooo.pcap')), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-q', '-z', 'expert,error,' + invalid_filter, + '-r', capture_file('http-ooo.pcap')), capture_output=True, env=test_env) assert proc.returncode == ExitCodes.COMMAND_LINE assert grep_output(proc.stdout, 'Filter "' + invalid_filter + '" is invalid') def test_tshark_z_expert_filter(self, cmd_tshark, capture_file, test_env): - proc = subprocess.run((cmd_tshark, '-q', '-z', 'expert,udp', # udp is a filter - '-r', capture_file('http-ooo.pcap')), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-q', '-z', 'expert,udp', # udp is a filter + '-r', capture_file('http-ooo.pcap')), capture_output=True, env=test_env) assert not grep_output(proc.stdout, 'Errors') assert not grep_output(proc.stdout, 'Warns') assert not grep_output(proc.stdout, 'Chats') def test_tshark_z_expert_error_filter(self, cmd_tshark, capture_file, test_env): - proc = subprocess.run((cmd_tshark, '-q', '-z', 'expert,error,udp', # udp is a filter - '-r', capture_file('http-ooo.pcap')), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-q', '-z', 'expert,error,udp', # udp is a filter + '-r', capture_file('http-ooo.pcap')), capture_output=True, env=test_env) assert not grep_output(proc.stdout, 'Errors') assert not grep_output(proc.stdout, 'Warns') assert not grep_output(proc.stdout, 'Chats') @@ -289,8 +287,8 @@ class TestTsharkExtcap: source_file = os.path.join(os.path.dirname(__file__), 'sampleif.py') shutil.copy2(source_file, extcap_dir_path) # Ensure the test extcap_tool is properly loaded - proc = subprocess.run((cmd_tshark, '-D'), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-D'), capture_output=True, env=test_env) assert count_output(proc.stdout, 'sampleif') == 1 # Ensure tshark lists 2 interfaces in the preferences - proc = subprocess.run((cmd_tshark, '-G', 'currentprefs'), capture_output=True, encoding='utf-8', env=test_env) + proc = subprocesstest.run((cmd_tshark, '-G', 'currentprefs'), capture_output=True, env=test_env) assert count_output(proc.stdout, 'extcap.sampleif.test') == 2