wireshark/test/suite_capture.py

604 lines
24 KiB
Python

#
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
'''Capture tests'''
import fixtures
import glob
import hashlib
import os
import socket
import subprocess
import subprocesstest
import sys
import threading
import time
import uuid
capture_duration = 5
testout_pcap = 'testout.pcap'
testout_pcapng = 'testout.pcapng'
snapshot_len = 96
class UdpTrafficGenerator(threading.Thread):
def __init__(self):
super().__init__(daemon=True)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.stopped = False
def run(self):
while not self.stopped:
time.sleep(.05)
self.sock.sendto(b'Wireshark test\n', ('127.0.0.1', 9))
def stop(self):
if not self.stopped:
self.stopped = True
self.join()
@fixtures.fixture
def traffic_generator():
'''
Traffic generator factory. Invoking it returns a tuple (start_func, cfilter)
where cfilter is a capture filter to match the generated traffic.
start_func can be invoked to start generating traffic and returns a function
which can be used to stop traffic generation early.
Currently generates a bunch of UDP traffic to localhost.
'''
threads = []
def start_processes():
thread = UdpTrafficGenerator()
thread.start()
threads.append(thread)
return thread.stop
try:
yield start_processes, 'udp port 9'
finally:
for thread in threads:
thread.stop()
@fixtures.fixture(scope='session')
def wireshark_k(wireshark_command):
return tuple(list(wireshark_command) + ['-k'])
def capture_command(*args, shell=False):
cmd_args = list(args)
if type(cmd_args[0]) != str:
# Assume something like ['wireshark', '-k']
cmd_args = list(cmd_args[0]) + list(cmd_args)[1:]
if sys.platform == "win32":
cmd_args[0] = '"{}"'.format(cmd_args[0])
if shell:
cmd_args = ' '.join(cmd_args)
return cmd_args
@fixtures.fixture
def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator):
start_traffic, cfilter = traffic_generator
def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
stop_traffic = start_traffic()
if to_stdout:
capture_proc = self.runProcess(capture_command(cmd,
'-i', '"{}"'.format(capture_interface),
'-p',
'-w', '-',
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
'-f', '"{}"'.format(cfilter),
'>', testout_file,
shell=True
),
shell=True
)
else:
capture_proc = self.runProcess(capture_command(cmd,
'-i', capture_interface,
'-p',
'-w', testout_file,
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
'-f', cfilter,
))
stop_traffic()
capture_returncode = capture_proc.returncode
if capture_returncode != 0:
self.log_fd.write('{} -D output:\n'.format(cmd))
self.runProcess((cmd, '-D'))
self.assertEqual(capture_returncode, 0)
self.checkPacketCount(10)
return check_capture_10_packets_real
@fixtures.fixture
def check_capture_fifo(cmd_dumpcap):
if sys.platform == 'win32':
fixtures.skip('Test requires OS fifo support.')
def check_capture_fifo_real(self, cmd=None):
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
fifo_file = self.filename_from_id('testout.fifo')
try:
# If a previous test left its fifo laying around, e.g. from a failure, remove it.
os.unlink(fifo_file)
except Exception:
pass
os.mkfifo(fifo_file)
slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
fifo_proc = self.startProcess(
('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
shell=True)
capture_proc = self.assertRun(capture_command(cmd,
'-i', fifo_file,
'-p',
'-w', testout_file,
'-a', 'duration:{}'.format(capture_duration),
))
fifo_proc.kill()
self.assertTrue(os.path.isfile(testout_file))
self.checkPacketCount(8)
return check_capture_fifo_real
@fixtures.fixture
def check_capture_stdin(cmd_dumpcap):
# Capturing always requires dumpcap, hence the dependency on it.
def check_capture_stdin_real(self, cmd=None):
# Similar to suite_io.check_io_4_packets.
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
capture_cmd = capture_command(cmd,
'-i', '-',
'-w', testout_file,
'-a', 'duration:{}'.format(capture_duration),
shell=True
)
is_gui = type(cmd) != str and '-k' in cmd[0]
if is_gui:
capture_cmd += ' --log-level=info'
pipe_proc = self.assertRun(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
if is_gui:
self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
self.assertTrue(os.path.isfile(testout_file))
self.checkPacketCount(8)
return check_capture_stdin_real
@fixtures.fixture
def check_capture_read_filter(capture_interface, traffic_generator):
start_traffic, cfilter = traffic_generator
def check_capture_read_filter_real(self, cmd=None):
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
stop_traffic = start_traffic()
capture_proc = self.assertRun(capture_command(cmd,
'-i', capture_interface,
'-p',
'-w', testout_file,
'-2',
'-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
'-f', cfilter,
))
stop_traffic()
self.checkPacketCount(0)
return check_capture_read_filter_real
@fixtures.fixture
def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator):
start_traffic, cfilter = traffic_generator
def check_capture_snapshot_len_real(self, cmd=None):
self.assertIsNotNone(cmd)
stop_traffic = start_traffic()
testout_file = self.filename_from_id(testout_pcap)
capture_proc = self.assertRun(capture_command(cmd,
'-i', capture_interface,
'-p',
'-w', testout_file,
'-s', str(snapshot_len),
'-a', 'duration:{}'.format(capture_duration),
'-f', cfilter,
))
stop_traffic()
self.assertTrue(os.path.isfile(testout_file))
# Use tshark to filter out all packets larger than 68 bytes.
testout2_file = self.filename_from_id('testout2.pcap')
filter_proc = self.assertRun((cmd_tshark,
'-r', testout_file,
'-w', testout2_file,
'-Y', 'frame.cap_len>{}'.format(snapshot_len),
))
self.checkPacketCount(0, cap_file=testout2_file)
return check_capture_snapshot_len_real
@fixtures.fixture
def check_dumpcap_autostop_stdin(cmd_dumpcap):
def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None):
# Similar to check_capture_stdin.
testout_file = self.filename_from_id(testout_pcap)
cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
condition='oops:invalid'
self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
if packets is not None:
condition = 'packets:{}'.format(packets)
elif filesize is not None:
condition = 'filesize:{}'.format(filesize)
cmd_ = '"{}"'.format(cmd_dumpcap)
capture_cmd = ' '.join((cmd_,
'-i', '-',
'-w', testout_file,
'-a', condition,
))
pipe_proc = self.assertRun(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
self.assertTrue(os.path.isfile(testout_file))
if packets is not None:
self.checkPacketCount(packets)
elif filesize is not None:
capturekb = os.path.getsize(testout_file) / 1000
self.assertGreaterEqual(capturekb, filesize)
return check_dumpcap_autostop_stdin_real
@fixtures.fixture
def check_dumpcap_ringbuffer_stdin(cmd_dumpcap):
def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None):
# Similar to check_capture_stdin.
rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
condition='oops:invalid'
self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
if packets is not None:
condition = 'packets:{}'.format(packets)
elif filesize is not None:
condition = 'filesize:{}'.format(filesize)
cmd_ = '"{}"'.format(cmd_dumpcap)
capture_cmd = ' '.join((cmd_,
'-i', '-',
'-w', testout_file,
'-a', 'files:2',
'-b', condition,
))
pipe_proc = self.assertRun(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
rb_files = glob.glob(testout_glob)
for rbf in rb_files:
self.cleanup_files.append(rbf)
self.assertEqual(len(rb_files), 2)
for rbf in rb_files:
self.assertTrue(os.path.isfile(rbf))
if packets is not None:
self.checkPacketCount(packets, cap_file=rbf)
elif filesize is not None:
capturekb = os.path.getsize(rbf) / 1000
self.assertGreaterEqual(capturekb, filesize)
return check_dumpcap_ringbuffer_stdin_real
@fixtures.fixture
def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, capture_file):
if sys.platform == 'win32':
fixtures.skip('Test requires OS fifo support.')
def check_dumpcap_pcapng_sections_real(self, multi_input=False, multi_output=False):
# Make sure we always test multiple SHBs in an input.
in_files_l = [ [
capture_file('many_interfaces.pcapng.1'),
capture_file('many_interfaces.pcapng.2')
] ]
if multi_input:
in_files_l.append([ capture_file('many_interfaces.pcapng.3') ])
fifo_files = []
fifo_procs = []
# Default values for our validity tests
check_val_d = {
'filename': None,
'packet_count': 0,
'idb_count': 0,
'ua_pt1_count': 0,
'ua_pt2_count': 0,
'ua_pt3_count': 0,
'ua_dc_count': 0,
}
check_vals = [ check_val_d ]
for in_files in in_files_l:
fifo_file = self.filename_from_id('dumpcap_pcapng_sections_{}.fifo'.format(len(fifo_files) + 1))
fifo_files.append(fifo_file)
# If a previous test left its fifo laying around, e.g. from a failure, remove it.
try:
os.unlink(fifo_file)
except Exception: pass
os.mkfifo(fifo_file)
cat_cmd = subprocesstest.cat_cap_file_command(in_files)
fifo_procs.append(self.startProcess(('{0} > {1}'.format(cat_cmd, fifo_file)), shell=True))
if multi_output:
rb_unique = 'sections_rb_' + uuid.uuid4().hex[:6] # Random ID
testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
check_vals.append(check_val_d.copy())
# check_vals[]['filename'] will be filled in below
else:
testout_file = self.filename_from_id(testout_pcapng)
check_vals[0]['filename'] = testout_file
# Capture commands
if not multi_input and not multi_output:
# Passthrough SHBs, single output file
capture_cmd_args = (
'-i', fifo_files[0],
'-w', testout_file
)
check_vals[0]['packet_count'] = 79
check_vals[0]['idb_count'] = 22
check_vals[0]['ua_pt1_count'] = 1
check_vals[0]['ua_pt2_count'] = 1
elif not multi_input and multi_output:
# Passthrough SHBs, multiple output files
capture_cmd_args = (
'-i', fifo_files[0],
'-w', testout_file,
'-a', 'files:2',
'-b', 'packets:53'
)
check_vals[0]['packet_count'] = 53
check_vals[0]['idb_count'] = 11
check_vals[0]['ua_pt1_count'] = 1
check_vals[1]['packet_count'] = 26
check_vals[1]['idb_count'] = 22
check_vals[1]['ua_pt1_count'] = 1
check_vals[1]['ua_pt2_count'] = 1
elif multi_input and not multi_output:
# Dumpcap SHBs, single output file
capture_cmd_args = (
'-i', fifo_files[0],
'-i', fifo_files[1],
'-w', testout_file
)
check_vals[0]['packet_count'] = 88
check_vals[0]['idb_count'] = 35
check_vals[0]['ua_dc_count'] = 1
else:
# Dumpcap SHBs, multiple output files
capture_cmd_args = (
'-i', fifo_files[0],
'-i', fifo_files[1],
'-w', testout_file,
'-a', 'files:2',
'-b', 'packets:53'
)
check_vals[0]['packet_count'] = 53
check_vals[0]['idb_count'] = 13
check_vals[0]['ua_dc_count'] = 1
check_vals[1]['packet_count'] = 35
check_vals[1]['idb_count'] = 35
check_vals[1]['ua_dc_count'] = 1
capture_cmd = capture_command(cmd_dumpcap, *capture_cmd_args)
capture_proc = self.assertRun(capture_cmd)
for fifo_proc in fifo_procs: fifo_proc.kill()
rb_files = []
if multi_output:
rb_files = sorted(glob.glob(testout_glob))
self.assertEqual(len(rb_files), 2)
check_vals[0]['filename'] = rb_files[0]
check_vals[1]['filename'] = rb_files[1]
for rbf in rb_files:
self.cleanup_files.append(rbf)
self.assertTrue(os.path.isfile(rbf))
# Output tests
if not multi_input and not multi_output:
# Check strict bit-for-bit passthrough.
in_hash = hashlib.sha256()
out_hash = hashlib.sha256()
for in_file in in_files_l[0]:
in_cap_file = capture_file(in_file)
with open(in_cap_file, 'rb') as f:
in_hash.update(f.read())
with open(testout_file, 'rb') as f:
out_hash.update(f.read())
self.assertEqual(in_hash.hexdigest(), out_hash.hexdigest())
# many_interfaces.pcapng.1 : 64 packets written by "Passthrough test #1"
# many_interfaces.pcapng.2 : 15 packets written by "Passthrough test #2"
# many_interfaces.pcapng.3 : 9 packets written by "Passthrough test #3"
# Each has 11 interfaces.
idb_compare_eq = True
if multi_input and multi_output:
# Having multiple inputs forces the use of threads. In our
# case this means that non-packet block counts in the first
# file in is nondeterministic.
idb_compare_eq = False
for check_val in check_vals:
self.checkPacketCount(check_val['packet_count'], cap_file=check_val['filename'])
tshark_proc = self.assertRun(capture_command(cmd_tshark,
'-r', check_val['filename'],
'-V',
'-X', 'read_format:MIME Files Format'
))
# XXX Are there any other sanity checks we should run?
if idb_compare_eq:
self.assertEqual(self.countOutput(r'Block: Interface Description Block',
proc=tshark_proc), check_val['idb_count'])
else:
self.assertGreaterEqual(self.countOutput(r'Block: Interface Description Block',
proc=tshark_proc), check_val['idb_count'])
idb_compare_eq = True
self.assertEqual(self.countOutput(r'Option: User Application = Passthrough test #1',
proc=tshark_proc), check_val['ua_pt1_count'])
self.assertEqual(self.countOutput(r'Option: User Application = Passthrough test #2',
proc=tshark_proc), check_val['ua_pt2_count'])
self.assertEqual(self.countOutput(r'Option: User Application = Passthrough test #3',
proc=tshark_proc), check_val['ua_pt3_count'])
self.assertEqual(self.countOutput(r'Option: User Application = Dumpcap \(Wireshark\)',
proc=tshark_proc), check_val['ua_dc_count'])
return check_dumpcap_pcapng_sections_real
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_wireshark_capture(subprocesstest.SubprocessTestCase):
def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets, make_screenshot_on_error):
'''Capture 10 packets from the network to a file using Wireshark'''
with make_screenshot_on_error():
check_capture_10_packets(self, cmd=wireshark_k)
# Wireshark doesn't currently support writing to stdout while capturing.
# def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
# '''Capture 10 packets from the network to stdout using Wireshark'''
# check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo, make_screenshot_on_error):
'''Capture from a fifo using Wireshark'''
with make_screenshot_on_error():
check_capture_fifo(self, cmd=wireshark_k)
def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin, make_screenshot_on_error):
'''Capture from stdin using Wireshark'''
with make_screenshot_on_error():
check_capture_stdin(self, cmd=wireshark_k)
def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len, make_screenshot_on_error):
'''Capture truncated packets using Wireshark'''
with make_screenshot_on_error():
check_capture_snapshot_len(self, cmd=wireshark_k)
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_tshark_capture(subprocesstest.SubprocessTestCase):
def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets):
'''Capture 10 packets from the network to a file using TShark'''
check_capture_10_packets(self, cmd=cmd_tshark)
def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets):
'''Capture 10 packets from the network to stdout using TShark'''
check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True)
def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo):
'''Capture from a fifo using TShark'''
check_capture_fifo(self, cmd=cmd_tshark)
def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin):
'''Capture from stdin using TShark'''
check_capture_stdin(self, cmd=cmd_tshark)
def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len):
'''Capture truncated packets using TShark'''
check_capture_snapshot_len(self, cmd=cmd_tshark)
@fixtures.mark_usefixtures('base_env')
@fixtures.uses_fixtures
class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets):
'''Capture 10 packets from the network to a file using Dumpcap'''
check_capture_10_packets(self, cmd=cmd_dumpcap)
def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets):
'''Capture 10 packets from the network to stdout using Dumpcap'''
check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True)
def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo):
'''Capture from a fifo using Dumpcap'''
check_capture_fifo(self, cmd=cmd_dumpcap)
def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin):
'''Capture from stdin using Dumpcap'''
check_capture_stdin(self, cmd=cmd_dumpcap)
def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap):
'''Capture truncated packets using Dumpcap'''
check_capture_snapshot_len(self, cmd=cmd_dumpcap)
@fixtures.mark_usefixtures('base_env')
@fixtures.uses_fixtures
class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
# duration, filesize, packets, files
def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin):
'''Capture from stdin using Dumpcap until we reach a file size limit'''
check_dumpcap_autostop_stdin(self, filesize=15)
def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin):
'''Capture from stdin using Dumpcap until we reach a packet limit'''
check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
@fixtures.mark_usefixtures('base_env')
@fixtures.uses_fixtures
class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
# duration, interval, filesize, packets, files
def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin):
'''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
check_dumpcap_ringbuffer_stdin(self, filesize=15)
def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin):
'''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.
@fixtures.mark_usefixtures('base_env')
@fixtures.uses_fixtures
class case_dumpcap_pcapng_sections(subprocesstest.SubprocessTestCase):
def test_dumpcap_pcapng_single_in_single_out(self, check_dumpcap_pcapng_sections):
'''Capture from a single pcapng source using Dumpcap and write a single file'''
if sys.byteorder == 'big':
fixtures.skip('this test is supported on little endian only')
check_dumpcap_pcapng_sections(self)
def test_dumpcap_pcapng_single_in_multi_out(self, check_dumpcap_pcapng_sections):
'''Capture from a single pcapng source using Dumpcap and write two files'''
if sys.byteorder == 'big':
fixtures.skip('this test is supported on little endian only')
check_dumpcap_pcapng_sections(self, multi_output=True)
def test_dumpcap_pcapng_multi_in_single_out(self, check_dumpcap_pcapng_sections):
'''Capture from two pcapng sources using Dumpcap and write a single file'''
if sys.byteorder == 'big':
fixtures.skip('this test is supported on little endian only')
check_dumpcap_pcapng_sections(self, multi_input=True)
def test_dumpcap_pcapng_multi_in_multi_out(self, check_dumpcap_pcapng_sections):
'''Capture from two pcapng sources using Dumpcap and write two files'''
if sys.byteorder == 'big':
fixtures.skip('this test is supported on little endian only')
check_dumpcap_pcapng_sections(self, multi_input=True, multi_output=True)