ether: add a complete SCTP handler
This commit is contained in:
parent
88d44fd8aa
commit
0a500811d9
|
@ -4,6 +4,7 @@
|
|||
# * Version : 0.4
|
||||
# *
|
||||
# * Copyright 2016. Benoit Michau. ANSSI.
|
||||
# * Copyright 2020. Benoit Michau. P1Sec.
|
||||
# *
|
||||
# * This library is free software; you can redistribute it and/or
|
||||
# * modify it under the terms of the GNU Lesser General Public
|
||||
|
@ -33,8 +34,10 @@ from pycrate_core.base import *
|
|||
from pycrate_core.repr import *
|
||||
|
||||
# EthernetPacket decoder requires basic L2 / L3 objects for decoding
|
||||
from .IP import IPv4, ICMP, IPv6, UDP, TCP
|
||||
from .ARP import ARP
|
||||
from .IP import IPv4, ICMP, IPv6, UDP, TCP
|
||||
from .SCTP import SCTP
|
||||
from .ARP import ARP
|
||||
|
||||
|
||||
EtherType_dict = {
|
||||
0x0800 : 'IPv4',
|
||||
|
@ -127,24 +130,31 @@ class VLAN(Envelope):
|
|||
# /IPv4 /ICMP
|
||||
# /UDP
|
||||
# /TCP
|
||||
# /SCTP
|
||||
# /IPv6 /UDP
|
||||
# /TCP
|
||||
# /SCTP
|
||||
|
||||
class EthernetPacket(Envelope):
|
||||
|
||||
_GEN = (
|
||||
Ethernet(),
|
||||
)
|
||||
|
||||
def _from_char(self, char):
|
||||
self.__init__()
|
||||
# Ethernet layer
|
||||
self[0]._from_char(char)
|
||||
typ = self[0][2].get_val()
|
||||
hier = 1
|
||||
# potential VLAN layer
|
||||
if typ == 0x8100:
|
||||
vlan = VLAN(hier=hier)
|
||||
hier += 1
|
||||
vlan._from_char(char)
|
||||
self.append(vlan)
|
||||
typ = vlan[3]._val
|
||||
# ARP or IP layer
|
||||
if typ == 0x0806:
|
||||
arp = ARP(hier=hier)
|
||||
hier += 1
|
||||
|
@ -162,6 +172,7 @@ class EthernetPacket(Envelope):
|
|||
ip._from_char(char)
|
||||
self.append(ip)
|
||||
typ = typ._val
|
||||
# ICMP, UDP, TCP or SCTP layer
|
||||
if typ == 1:
|
||||
icmp = ICMP(hier=hier)
|
||||
hier += 1
|
||||
|
@ -178,8 +189,21 @@ class EthernetPacket(Envelope):
|
|||
hier += 1
|
||||
udp._from_char(char)
|
||||
self.append(udp)
|
||||
data = Buf('Data', hier=hier)
|
||||
hier += 1
|
||||
data._from_char(char)
|
||||
self.append(data)
|
||||
elif typ == 132:
|
||||
sctp = SCTP(hier=hier)
|
||||
hier += 1
|
||||
sctp._from_char(char)
|
||||
self.append(sctp)
|
||||
# remaining higher layer undecoded data
|
||||
if char.len_byte():
|
||||
data = Buf('Data', hier=hier)
|
||||
hier += 1
|
||||
data._from_char(char)
|
||||
if isinstance(self[-1], IPv4):
|
||||
# remove the proto field automation
|
||||
self[-1]['proto'].set_valauto(None)
|
||||
elif isinstance(self[-1], IPv6):
|
||||
# remove the next field automation
|
||||
self[-1]['next'].set_valauto(None)
|
||||
self.append(data)
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ except ImportError:
|
|||
print('[pycrate_ether/IP.py] inet_pton() and inet_ntop() not available')
|
||||
|
||||
from pycrate_core.utils import reverse_dict
|
||||
from pycrate_core.elt import Envelope, Array, REPR_RAW, REPR_HEX, REPR_BIN
|
||||
from pycrate_core.elt import Envelope, Sequence, REPR_RAW, REPR_HEX, REPR_BIN
|
||||
from pycrate_core.base import *
|
||||
from pycrate_core.repr import *
|
||||
|
||||
|
@ -270,43 +270,35 @@ class IPv4Option(Envelope):
|
|||
_GEN = (
|
||||
Uint8('CCN', dic=IPv4Opt_dict),
|
||||
Uint8('len'), # trans and val automated
|
||||
Buf('val', val=b'') # trans and bl automated
|
||||
Buf('val', val=b'', rep=REPR_HEX) # trans and bl automated
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
Envelope.__init__(self, *args, **kwargs)
|
||||
self[1].set_transauto(self._set_trans)
|
||||
self[1].set_valauto(self._set_len_val)
|
||||
self[2].set_transauto(self._set_trans)
|
||||
self[2].set_blauto(self._set_val_bl)
|
||||
|
||||
def _set_trans(self):
|
||||
if self[0].get_val() in (0, 1):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _set_len_val(self):
|
||||
return 2 + self[2].get_len()
|
||||
|
||||
def _set_val_bl(self):
|
||||
return max(0, 8 * (self[1].get_val()-2))
|
||||
self[1].set_transauto(lambda: True if self[0]() in (0, 1) else False)
|
||||
self[1].set_valauto(lambda: 2 + self[2].get_len())
|
||||
self[2].set_transauto(lambda: True if self[0]() in (0, 1) else False)
|
||||
self[2].set_blauto(lambda: max(0, (self[1]()-2)<<3))
|
||||
|
||||
|
||||
class IPv4Options(Envelope):
|
||||
_GEN = (
|
||||
Array('opts', GEN=IPv4Option()),
|
||||
Sequence('opts', GEN=IPv4Option()),
|
||||
Buf('pad') # bl automated
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
Envelope.__init__(self, *args, **kwargs)
|
||||
self[1].set_blauto(self._set_pad_bl)
|
||||
self[1].set_blauto(lambda: self._pad_bl())
|
||||
|
||||
def _set_pad_bl(self):
|
||||
def _pad_bl(self):
|
||||
opts_ext = self[0].get_bl() % 32
|
||||
if opts_ext:
|
||||
return 32 - opts_ext
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
class IPv4(Envelope):
|
||||
_GEN = (
|
||||
Uint('vers', val=4, bl=4),
|
||||
|
@ -327,61 +319,53 @@ class IPv4(Envelope):
|
|||
Uint16('hdr_cs', rep=REPR_HEX), # val automated
|
||||
Buf('src', val=b'\x7f\0\0\x01', bl=32, rep=REPR_HEX),
|
||||
Buf('dst', val=b'\x7f\0\0\x01', bl=32, rep=REPR_HEX),
|
||||
Buf('opt', val=b''), # bl automated
|
||||
Buf('opt', val=b'', rep=REPR_HEX), # bl automated
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
# enable to pass IPv4 addr in human-readable format
|
||||
if 'val' in kwargs:
|
||||
# enable to pass IPv4 addr in human-readable format and convert them
|
||||
if 'src' in kwargs['val'] and len(kwargs['val']['src']) > 4:
|
||||
try:
|
||||
kwargs['val']['src'] = inet_aton(kwargs['val']['src'])
|
||||
except:
|
||||
except Exception:
|
||||
pass
|
||||
if 'dst' in kwargs['val'] and len(kwargs['val']['dst']) > 4:
|
||||
try:
|
||||
kwargs['val']['dst'] = inet_aton(kwargs['val']['dst'])
|
||||
except:
|
||||
except Exception:
|
||||
pass
|
||||
Envelope.__init__(self, *args, **kwargs)
|
||||
self[1].set_valauto(self._set_hdrwlen_val)
|
||||
self['hdr_wlen'].set_valauto(lambda: 5 + (self['opt'].get_len()>>2))
|
||||
if 'val' not in kwargs or 'len' not in kwargs['val']:
|
||||
self[7].set_valauto(self._set_len_val)
|
||||
self['len'].set_valauto(lambda: self._len_val())
|
||||
if 'val' not in kwargs or 'proto' not in kwargs['val']:
|
||||
self[14].set_valauto(self._set_prot_val)
|
||||
self[15].set_valauto(self.checksum)
|
||||
self[18].set_blauto(self._set_opt_bl)
|
||||
self['proto'].set_valauto(lambda: self._proto_val())
|
||||
self['hdr_cs'].set_valauto(lambda: checksum(self[:15].to_bytes() + b'\0\0' + self[16:].to_bytes()))
|
||||
self['opt'].set_blauto(lambda: max(0, (self['hdr_wlen'].get_val()-5)<<5))
|
||||
|
||||
def _set_hdrwlen_val(self):
|
||||
return 5 + (self[18].get_len() // 4)
|
||||
|
||||
def _set_len_val(self):
|
||||
def _len_val(self):
|
||||
pay = self.get_payload()
|
||||
if pay is not None:
|
||||
return 20 + self[18].get_len() + pay.get_len()
|
||||
return 20 + self['opt'].get_len() + pay.get_len()
|
||||
else:
|
||||
return 20 + self[18].get_len()
|
||||
return 20 + self['opt'].get_len()
|
||||
|
||||
def _set_prot_val(self):
|
||||
def _proto_val(self):
|
||||
pay = self.get_payload()
|
||||
if pay is not None and pay[0]._name in IPProtRev_dict:
|
||||
return IPProtRev_dict[pay[0]._name]
|
||||
else:
|
||||
return 0xff
|
||||
|
||||
def checksum(self):
|
||||
return checksum(self[:15].to_bytes() + b'\0\0' + self[16:].to_bytes())
|
||||
|
||||
def _set_opt_bl(self):
|
||||
return max(0, 32 * (self[1].get_val()-5))
|
||||
|
||||
def _from_char(self, char):
|
||||
Envelope._from_char(self, char)
|
||||
opts_buf = self[18].get_val()
|
||||
opts_buf = self['opt'].get_val()
|
||||
if opts_buf:
|
||||
opts = IPv4Options()
|
||||
opts = IPv4Options('opt')
|
||||
opts.from_bytes(opts_buf)
|
||||
if opts.to_bytes() == opts_buf:
|
||||
self.replace(self[18], opts)
|
||||
self.replace(self['opt'], opts)
|
||||
|
||||
|
||||
class ICMP(Envelope):
|
||||
|
@ -391,16 +375,10 @@ class ICMP(Envelope):
|
|||
Uint16('cs', rep=REPR_HEX), # val automated
|
||||
Buf('data', val=b'\0\0coucou')
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
Envelope.__init__(self, *args, **kwargs)
|
||||
self[2].set_valauto(self.checksum)
|
||||
|
||||
def checksum(self):
|
||||
cs_old = self[2]._val
|
||||
self[2]._val = 0
|
||||
icmp = self.to_bytes()
|
||||
self[2]._val = cs_old
|
||||
return checksum(icmp)
|
||||
self[2].set_valauto(lambda: checksum(pack('>BB', self[0](), self[1]()) + b'\0\0' + self[3].to_bytes()))
|
||||
|
||||
|
||||
class IPv6(Envelope):
|
||||
|
@ -414,6 +392,7 @@ class IPv6(Envelope):
|
|||
Buf('src', val=15*b'\0'+b'\x01', bl=128, rep=REPR_HEX),
|
||||
Buf('dst', val=15*b'\0'+b'\x01', bl=128, rep=REPR_HEX)
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
# enable to pass IPv4 addr in human-readable format
|
||||
if 'val' in kwargs:
|
||||
|
@ -429,24 +408,26 @@ class IPv6(Envelope):
|
|||
pass
|
||||
Envelope.__init__(self, *args, **kwargs)
|
||||
if 'val' not in kwargs or 'plen' not in kwargs['val']:
|
||||
self[3].set_valauto(self._set_plen_val)
|
||||
self[3].set_valauto(lambda: self._plen_val())
|
||||
if 'val' not in kwargs or 'next' not in kwargs['val']:
|
||||
self[4].set_valauto(self._set_next_val)
|
||||
self[4].set_valauto(lambda: self._next_val())
|
||||
|
||||
def _set_plen_val(self):
|
||||
def _plen_val(self):
|
||||
pay = self.get_payload()
|
||||
if pay is not None:
|
||||
return pay.get_len()
|
||||
else:
|
||||
return 0
|
||||
|
||||
def _set_next_val(self):
|
||||
def _next_val(self):
|
||||
pay = self.get_payload()
|
||||
if pay is not None and pay[0]._name in IPProtRev_dict:
|
||||
return IPProtRev_dict[pay[0]._name]
|
||||
else:
|
||||
return 0xff
|
||||
|
||||
# TODO: define structures for IPv6 options
|
||||
|
||||
|
||||
class UDP(Envelope):
|
||||
_CS_OFF = False # for checksum offload
|
||||
|
@ -456,13 +437,14 @@ class UDP(Envelope):
|
|||
Uint16('len'), # val automated, unless initialized to fixed value
|
||||
Uint16('cs', rep=REPR_HEX) # val automated
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
Envelope.__init__(self, *args, **kwargs)
|
||||
if 'val' not in kwargs or 'len' not in kwargs['val']:
|
||||
self[2].set_valauto(self._set_len_val)
|
||||
self[3].set_valauto(self.checksum)
|
||||
self[2].set_valauto(lambda: self._len_val())
|
||||
self[3].set_valauto(lambda: self.checksum())
|
||||
|
||||
def _set_len_val(self):
|
||||
def _len_val(self):
|
||||
pay = self.get_payload()
|
||||
if pay is not None:
|
||||
return 8 + pay.get_len()
|
||||
|
@ -473,36 +455,30 @@ class UDP(Envelope):
|
|||
if self._CS_OFF:
|
||||
return 0
|
||||
# get UDP and payload buffer
|
||||
udp = pack('>HHH', self[0](), self[1](), self[2]()) + b'\0\0'
|
||||
pay = self.get_payload()
|
||||
if pay is not None:
|
||||
udp = b''.join((self[:3].to_bytes(),
|
||||
b'\0\0',
|
||||
pay.to_bytes()))
|
||||
else:
|
||||
udp = self[:3].to_bytes() + b'\0\0'
|
||||
udp += pay.to_bytes()
|
||||
# get pseudo hdr buffer
|
||||
ludp = len(udp)
|
||||
hdr = self.get_header()
|
||||
while hdr is not None and not isinstance(hdr, (IPv4, IPv6)):
|
||||
# this is mostly to jump over IPv6 options
|
||||
hdr = hdr.get_header()
|
||||
if hdr is not None:
|
||||
# keep only src addr, dst addr and prot
|
||||
# keep only src addr, dst addr and proto
|
||||
vers = hdr[0].get_val()
|
||||
if vers == 4:
|
||||
phdr = b''.join((hdr[16].to_bytes(),
|
||||
hdr[17].to_bytes(),
|
||||
b'\0\x11',
|
||||
pack('>H', ludp)))
|
||||
phdr = hdr['src'].to_bytes() + hdr['dst'].to_bytes() + b'\0\x11' + pack('>H', ludp)
|
||||
elif vers == 6:
|
||||
# WNG: in case of IPv6 routing header, dst addr is incorrect
|
||||
phdr = b''.join((hdr[6].to_bytes(),
|
||||
hdr[7].to_bytes(),
|
||||
b'\0\x11',
|
||||
pack('>H', ludp)))
|
||||
phdr = hdr['src'].to_bytes() + hdr['dst'].to_bytes() + b'\0\x11' + pack('>H', ludp)
|
||||
else:
|
||||
phdr = b''
|
||||
else:
|
||||
phdr = b''
|
||||
# compute checksum
|
||||
return checksum(b''.join((phdr, udp)))
|
||||
return checksum(phdr + udp)
|
||||
|
||||
|
||||
class TCP(Envelope):
|
||||
|
@ -526,56 +502,42 @@ class TCP(Envelope):
|
|||
Uint16('win', val=8192),
|
||||
Uint16('cs', rep=REPR_HEX), # val automated
|
||||
Uint16('urg'),
|
||||
Buf('opt', val=b'') # bl automated
|
||||
Buf('opt', val=b'', rep=REPR_HEX) # bl automated
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
Envelope.__init__(self, *args, **kwargs)
|
||||
if 'val' not in kwargs or 'off' not in kwargs['val']:
|
||||
self[4].set_valauto(self._set_off_val)
|
||||
self[16].set_valauto(self.checksum)
|
||||
self[18].set_blauto(self._set_opt_bl)
|
||||
|
||||
def _set_off_val(self):
|
||||
return 5 + (self[18].get_len() // 4)
|
||||
self['off'].set_valauto(lambda: 5 + (self['opt'].get_len()>>2))
|
||||
self['cs'].set_valauto(lambda: self.checksum())
|
||||
self['opt'].set_blauto(lambda: max(0, (self['off'].get_val()-5)<<5))
|
||||
|
||||
def checksum(self):
|
||||
if self._CS_OFF:
|
||||
return 0
|
||||
# get TCP and payload buffer
|
||||
# get TCP header and payload buffer
|
||||
tcp = self[:16].to_bytes() + b'\0\0' + self[17].to_bytes() + self[18].to_bytes()
|
||||
pay = self.get_payload()
|
||||
if pay is not None:
|
||||
tcp = b''.join((self[:16].to_bytes(),
|
||||
b'\0\0',
|
||||
self[17:].to_bytes(),
|
||||
pay.to_bytes()))
|
||||
else:
|
||||
tcp = b''.join((self[:16].to_bytes(),
|
||||
b'\0\0',
|
||||
self[17:].to_bytes()))
|
||||
tcp += pay.to_bytes()
|
||||
# get pseudo hdr buffer
|
||||
ltcp = len(tcp)
|
||||
hdr = self.get_header()
|
||||
while hdr is not None and not isinstance(hdr, (IPv4, IPv6)):
|
||||
# this is mostly to jump over IPv6 options
|
||||
hdr = hdr.get_header()
|
||||
if hdr is not None:
|
||||
# keep only src addr, dst addr and prot
|
||||
vers = hdr[0].get_val()
|
||||
if vers == 4:
|
||||
phdr = b''.join((hdr[16].to_bytes(),
|
||||
hdr[17].to_bytes(),
|
||||
b'\0\x06',
|
||||
pack('>H', ltcp)))
|
||||
phdr = hdr['src'].to_bytes() + hdr['dst'].to_bytes() + b'\0\x06' + pack('>H', ltcp)
|
||||
elif vers == 6:
|
||||
# WNG: in case of IPv6 routing header, dst addr is incorrect
|
||||
phdr = b''.join((hdr[6].to_bytes(),
|
||||
hdr[7].to_bytes(),
|
||||
b'\0\x06',
|
||||
pack('>H', ltcp)))
|
||||
phdr = hdr['src'].to_bytes() + hdr['dst'].to_bytes() + b'\0\x06' + pack('>H', ltcp)
|
||||
else:
|
||||
phdr = b''
|
||||
else:
|
||||
phdr = b''
|
||||
# compute checksum
|
||||
return checksum(phdr + tcp)
|
||||
|
||||
def _set_opt_bl(self):
|
||||
return max(0, 32 * (self[4].get_val()-5))
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -27,5 +27,5 @@
|
|||
# *--------------------------------------------------------
|
||||
#*/
|
||||
#
|
||||
__all__ = ['Ethernet', 'ARP', 'IP', 'PCAP']
|
||||
__all__ = ['Ethernet', 'ARP', 'IP', 'SCTP', 'PCAP']
|
||||
__version__ = '0.4.0'
|
||||
|
|
|
@ -40,43 +40,51 @@ TCP._CS_OFF = False
|
|||
UDP._CS_OFF = False
|
||||
|
||||
# Some examples of Ethernet / ARP or IPv4 packets
|
||||
eth_arp = unhexlify(b'22334455667700112233445508060001080006040002001122334455c0a8000a223344556677c0a80001')
|
||||
eth_ipv4_udp_dns = unhexlify(b'0011223344552233445566770800450000469f4900003f115b02c0a80001c0a8000a0035cac100325d3f9ccd818000010001000000000469657466036f72670000010001c00c00010001000006f40004041fc62c')
|
||||
eth_ipv4_tcp_http = unhexlify(b'2233445566770011223344550800450001de94f4400040061928c0a8000a041fc62ccd460050418754bcd7b1410e8018001c929e00000101080a017ec07206520e5d474554202f20485454502f312e310d0a486f73743a20696574662e6f72670d0a557365722d4167656e743a204d6f7a696c6c612f352e3020285831313b205562756e74753b204c696e7578207838365f36343b2072763a34362e3029204765636b6f2f32303130303130312046697265666f782f34362e300d0a4163636570743a20746578742f68746d6c2c6170706c69636174696f6e2f7868746d6c2b786d6c2c6170706c69636174696f6e2f786d6c3b713d302e392c2a2f2a3b713d302e380d0a4163636570742d4c616e67756167653a20656e2d55532c656e3b713d302e350d0a4163636570742d456e636f64696e673a20677a69702c206465666c6174650d0a436f6f6b69653a207374796c6553686565743d310d0a436f6e6e656374696f6e3a206b6565702d616c6976650d0a49662d4d6f6469666965642d53696e63653a204d6f6e2c2032352041707220323031362032303a32323a353620474d540d0a49662d4e6f6e652d4d617463683a2022343736372d353331353466313233393865632d677a6970220d0a43616368652d436f6e74726f6c3a206d61782d6167653d300d0a0d0a')
|
||||
# TODO: add some IPv6 packets
|
||||
eth_arp = unhexlify(b'22334455667700112233445508060001080006040002001122334455c0a8000a223344556677c0a80001')
|
||||
#
|
||||
eth_frames = (eth_arp, eth_ipv4_udp_dns, eth_ipv4_tcp_http)
|
||||
eth_ipv4_udp_dns = unhexlify(b'0011223344552233445566770800450000469f4900003f115b02c0a80001c0a8000a0035cac100325d3f9ccd818000010001000000000469657466036f72670000010001c00c00010001000006f40004041fc62c')
|
||||
eth_ipv4_tcp = unhexlify(b'8c1645d7cd67fcecda022e37080045000034ce6600007606ea81acd912ceac1b201901bbde4058e3443e62715904801001be295b00000101080a464a8cef0764ac97')
|
||||
eth_ipv4_tcp_http = unhexlify(b'2233445566770011223344550800450001de94f4400040061928c0a8000a041fc62ccd460050418754bcd7b1410e8018001c929e00000101080a017ec07206520e5d474554202f20485454502f312e310d0a486f73743a20696574662e6f72670d0a557365722d4167656e743a204d6f7a696c6c612f352e3020285831313b205562756e74753b204c696e7578207838365f36343b2072763a34362e3029204765636b6f2f32303130303130312046697265666f782f34362e300d0a4163636570743a20746578742f68746d6c2c6170706c69636174696f6e2f7868746d6c2b786d6c2c6170706c69636174696f6e2f786d6c3b713d302e392c2a2f2a3b713d302e380d0a4163636570742d4c616e67756167653a20656e2d55532c656e3b713d302e350d0a4163636570742d456e636f64696e673a20677a69702c206465666c6174650d0a436f6f6b69653a207374796c6553686565743d310d0a436f6e6e656374696f6e3a206b6565702d616c6976650d0a49662d4d6f6469666965642d53696e63653a204d6f6e2c2032352041707220323031362032303a32323a353620474d540d0a49662d4e6f6e652d4d617463683a2022343736372d353331353466313233393865632d677a6970220d0a43616368652d436f6e74726f6c3a206d61782d6167653d300d0a0d0a')
|
||||
eth_ipv4_sctp_data = unhexlify(b'a44bb2d9e3cef60babc3b51408004500038000c700004084dcfeb8de2670bbb6fe2f0ded0ded818195460ed7d1440003011de57876220001af940000000501000b010000010d006a88d8003ec34c00836daf93280981030e190b12080012044366905824560b1208001104638313686909d96281d648043d0176f06b1a2818060700118605010101a00d600ba1090607040000010019026c81b1a181ae02010002012e3081a5800831389689465127f08407911501574210f304818f0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000300dfe57876230001af950000000501000b01000000cf006a88d8003ec34d00836daf93c80981030e190b12070012045507842350430b12060011044340005450479b658198480410272e8c4904081608e46b262824060700118605010101a0196117a109060704000001000e03a203020100a305a1030201006c62a260020100305b020138a356a15430520410606161283ac926eb256cf41bfede584c04086013e4bb46cd1f0e04106aed5f0c8f12d661dbfae475c9fa767f04106207f65ace3942d0a3b041e81207bf520410ebba4f51ea7100000ec0abef5c71764700000300b0e57876240001af960000000501000b01000000a0006a88d8003ec34e00836daf93480981030e190b12950012044040051283600b12060011042602701363916c646a4904010404566c62a26002012c305b020138a356a1543052041013588fe881c390ec61a92684b5972bbc0408d54677a5743501ec041011bd061bcff52e3cba73751230c91d4504101eddca8180c13db4db57c62bf42e41680410dc5d88445f590000f9fa2c7e74fd0b25000300b0e57876250001af970000000501000b01000000a0006a88d8003ec34f00836daf93a80981030e190b12950012046647117122140b12060011049735408513006c646a4904010600c06c62a2600201b4305b020138a356a15430520410a0e24e93fc8e35bc37d4e75bf9c0fbe404085e8086949058e4c804106b92a335cf544b1c8009fa04f8bf26ad0410e0ecc07807abed861ac686cca2baf93b041023ab83c76f260000107d95c528097891')
|
||||
eth_ipv4_sctp_sack = unhexlify(b'a44bb2d9e3cef60babc3b51408004500022000cb00004084de5ab8de2670bbb6fe2f0ded0ded81819546ad72d90d03000010971c9c00000249f00000000000030124e57876260001af980000000501000b0100000114006a88db003ec35000836daf93280981030e190b12950012044304813588140b1206001104260270136391e06581dd480420254c6b490400d103306b262824060700118605010101a0196117a109060704000001002003a203020100a305a1030201006c81a6a181a30201a502010730819a8107912207134841f7830100b07d0500a179302a0201019002f12192031b921f940ca3eeb2df367f09bd3d6f33ee8009017396d4fe44810101820300640030220201039002f12192031b921f9404146bb4fc8009017396d4fe44810101820300640030270201089002f12192031b921f94092cd4737dd45da74e6e8009017396d4fe448101018203006400980100bf1f098607915461490007f9000300cce57876270001af990000000501000b01000000bc006a88db003ec35100836daf93880981030e190b12070012041895158224480b12060011043485864244118865818548041023536e4904d98e0cf66c77a175020102020107306da74da01504012b301030068301108401043006820110840104a015040121301030068301108401043006820110840104a11d0401923018300683011084010530068301208401043006820110840104ad1ca01a301530130a010202021b588007915531123384f8810100800101')
|
||||
eth_ipv4_sctp_init = unhexlify(b'0000000000000000000000000800450200440000400040843c327f0000017f0000012712271100000000ff16659a010000240ae91d2f0001a000000300034cc692eb000c00060005000080000004c0000004')
|
||||
eth_ipv4_sctp_shut = unhexlify(b'0000000000000000000000000800450200240003400040843c4f7f0000017f0000012712271129eadab86b6737600e000004')
|
||||
#
|
||||
eth_ipv6_mdns = unhexlify(b'3333000000fbe86a64e8da5a86dd600eac14003511fffe80000000000000dac6d10f0a1eb230ff0200000000000000000000000000fb14e914e900356d93000000000002000000000000055f69707073045f746370056c6f63616c00000c0001045f697070c012000c0001')
|
||||
eth_ipv6_icmp6 = unhexlify(b'3333ff00000054ab3a1bf16886dd6000000000203afffe80000000000000f667d256c7dda12cff0200000000000000000001ff0000008700c9250000000000000000000000000000000000000000010154ab3a1bf168')
|
||||
eth_ipv6_hopopt = unhexlify(b'333300000016ac88fd3a993886dd6000000000240001fe8000000000000010e8019ca4d1f03dff0200000000000000000000000000163a000100050200008f00c77c0000000104000000ff0200000000000000000000000000fb')
|
||||
#
|
||||
eth_frames = (
|
||||
eth_arp,
|
||||
eth_ipv4_udp_dns,
|
||||
eth_ipv4_tcp,
|
||||
eth_ipv4_tcp_http,
|
||||
eth_ipv4_sctp_data,
|
||||
eth_ipv4_sctp_sack,
|
||||
eth_ipv4_sctp_init,
|
||||
eth_ipv4_sctp_shut,
|
||||
eth_ipv6_mdns,
|
||||
eth_ipv6_icmp6,
|
||||
eth_ipv6_hopopt
|
||||
)
|
||||
|
||||
|
||||
def test_ip(eth_frames=eth_frames):
|
||||
def test_ether(eth_frames=eth_frames):
|
||||
for f in eth_frames:
|
||||
pkt = EthernetPacket()
|
||||
pkt.from_bytes(f)
|
||||
v = pkt.get_val()
|
||||
pkt.reautomate()
|
||||
assert( pkt.get_val() == v )
|
||||
assert( pkt.to_bytes() == f )
|
||||
|
||||
def test_eth(eth_frames=eth_frames):
|
||||
for f in eth_frames:
|
||||
pkt = EthernetPacket()
|
||||
pkt.from_bytes(f)
|
||||
|
||||
def test_perf_ether(eth_frames=eth_frames):
|
||||
|
||||
print('[+] instantiating and parsing Ethernet frames')
|
||||
Ta = timeit(test_eth, number=200)
|
||||
print('test_eth: {0:.4f}'.format(Ta))
|
||||
|
||||
print('[+] regenerating Ethernet frames')
|
||||
pkt = EthernetPacket()
|
||||
pkt.from_bytes(eth_ipv4_udp_dns)
|
||||
pkt.reautomate()
|
||||
Tb = timeit(pkt.to_bytes, number=200)
|
||||
pkt.from_bytes(eth_ipv4_tcp_http)
|
||||
pkt.reautomate()
|
||||
Tc = timeit(pkt.to_bytes, number=200)
|
||||
print('pkt.to_bytes: {0:.4f}'.format(Tb+Tc))
|
||||
|
||||
print('[+] test_ether total time: {0:.4f}'.format(Ta+Tb+Tc))
|
||||
print('[+] decoding and re-encoding Ethernet / IP frames')
|
||||
Ta = timeit(test_ether, number=100)
|
||||
print('test_ether: {0:.4f}'.format(Ta))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_perf_ether()
|
||||
|
|
|
@ -89,7 +89,7 @@ class TestPycrate(unittest.TestCase):
|
|||
# fmt_ip objects
|
||||
def test_ether(self):
|
||||
print('[<>] testing pycrate_ether')
|
||||
test_ip(eth_frames)
|
||||
test_ether(eth_frames)
|
||||
|
||||
# asn1c
|
||||
def test_asn1c(self):
|
||||
|
|
Loading…
Reference in New Issue