Initial import of s1ap_reiniger.py
This commit is contained in:
commit
9dcedc7cfd
|
@ -0,0 +1,242 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Reiniger: S1AP / NAS packet capture anonymizer.
|
||||
#
|
||||
# (C) 2019 by Vadim Yanitskiy <axilirator@gmail.com>
|
||||
#
|
||||
# All rights reserved.
|
||||
#
|
||||
# SPDX-License-Identifier: GPL-2.0-or-later
|
||||
#
|
||||
# Released under the terms of GNU General Public License,
|
||||
# Version 2 or (at your option) any later version.
|
||||
#
|
||||
# Based on Scapy and pycrate:
|
||||
# https://github.com/secdev/scapy
|
||||
# https://github.com/P1sec/pycrate
|
||||
|
||||
import logging as log
|
||||
import sys
|
||||
|
||||
from scapy.all import PcapReader, PcapWriter
|
||||
from scapy.all import SCTPChunkData, NoPayload
|
||||
from scapy.all import SCTP
|
||||
|
||||
from pycrate_asn1dir import S1AP
|
||||
from pycrate_mobile import NAS
|
||||
|
||||
def handle_nas_pdu(pdu, dl, regen = False):
|
||||
log.debug("Processing %s NAS PDU: %s" % ("Downlink" if dl else "Uplink", pdu.encode('hex')))
|
||||
(msg, code) = NAS.parse_NASLTE_MT(pdu) if dl else NAS.parse_NASLTE_MO(pdu)
|
||||
if code:
|
||||
log.error("Failed to parse NAS payload")
|
||||
return None
|
||||
|
||||
# Try to find EPSID (may contain IMSI)
|
||||
# TODO: also patch IMEI / IMEISV
|
||||
try:
|
||||
epsid = msg['EPSID'][1]
|
||||
# Check if EPSID contains exactly IMSI
|
||||
if epsid[2].get_val() == 1:
|
||||
log.info("Cleaning %s" % epsid.repr())
|
||||
# 262420000000000, Vodafone GmbH, Germany
|
||||
epsid.from_bytes('\x29\x26\x24' + '\x00' * 5)
|
||||
regen = True
|
||||
except:
|
||||
pass
|
||||
|
||||
return msg.to_bytes() if regen else pdu
|
||||
|
||||
def handle_s1ap_imsi(imsi):
|
||||
log.info("Cleaning IMSI: %s" % NAS.decode_bcd(imsi))
|
||||
# 262420000000000, Vodafone GmbH, Germany
|
||||
return '\x29\x26\x24' + '\x00' * 5
|
||||
|
||||
def handle_s1ap_tmsi(tmsi):
|
||||
log.info("Cleaning TMSI: %s" % tmsi.encode('hex'))
|
||||
return tmsi # NOTE: for now, keep TMSI unchanged
|
||||
|
||||
def handle_s1ap_imeisv(imeisv):
|
||||
log.info("Cleaning IMEISV: %s" % NAS.decode_bcd(imeisv))
|
||||
# 3555720187847840 (Motorola C113)
|
||||
return '\x33\x55\x75\x02\x81\x87\x74\x48\xf0'
|
||||
|
||||
def find_s1ap_ie(ie_list, ie_id):
|
||||
for ie in ie_list:
|
||||
if ie['id'] == ie_id:
|
||||
return ie
|
||||
return None
|
||||
|
||||
def find_s1ap_ies(ie_list, ie_ids):
|
||||
for ie in ie_list:
|
||||
if ie['id'] in ie_ids:
|
||||
return ie
|
||||
return None
|
||||
|
||||
def handle_s1ap_paging(msg):
|
||||
ie = find_s1ap_ie(msg['protocolIEs'], S1AP.S1AP_Constants.id_UEPagingID.get_val())
|
||||
if ie is None:
|
||||
return
|
||||
|
||||
ueid = ie['value'][1]
|
||||
if ueid[0] == 's-TMSI':
|
||||
tmsi = handle_s1ap_tmsi(ueid[1]['m-TMSI'])
|
||||
ueid[1]['m-TMSI'] = tmsi
|
||||
elif ueid[0] == 'iMSI':
|
||||
imsi = handle_s1ap_imsi(ueid[1])
|
||||
# FIXME: I am not 100% sure if this is correct
|
||||
ie['value'] = ('iMSI', imsi)
|
||||
else:
|
||||
log.warn("Unknown Paging identity type '%s'" % ueid[0])
|
||||
|
||||
def find_and_handle_s1ap_imei_ie(msg):
|
||||
ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_Masked_IMEISV.get_val())
|
||||
if ie is not None:
|
||||
imeisv = handle_s1ap_imeisv(ie['value'][1])
|
||||
# FIXME: I am not 100% sure if this is correct
|
||||
ie['value'] = (ie['value'][0], imeisv)
|
||||
|
||||
def find_and_handle_s1ap_nas_pdu(msg, dl):
|
||||
ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_NAS_PDU.get_val())
|
||||
if ie is not None:
|
||||
pdu = handle_nas_pdu(ie['value'][1], dl)
|
||||
ie['value'] = ('NAS-PDU', pdu)
|
||||
|
||||
def find_and_handle_s1ap_nested_nas_pdu(msg):
|
||||
IEs = {
|
||||
S1AP.S1AP_Constants.id_E_RABToBeSetupListCtxtSUReq.get_val() : 'E-RABToBeSetupItemCtxtSUReq',
|
||||
S1AP.S1AP_Constants.id_E_RABToBeSetupListBearerSUReq.get_val() : 'E-RABToBeSetupItemBearerSUReq',
|
||||
S1AP.S1AP_Constants.id_E_RABToBeModifiedListBearerModReq.get_val() : 'E-RABToBeModifiedItemBearerModReq'}
|
||||
ie = find_s1ap_ies(msg[1]['protocolIEs'], IEs.keys())
|
||||
if ie is None:
|
||||
return
|
||||
|
||||
for item in ie['value'][1]:
|
||||
if item['value'][0] in IEs.values():
|
||||
pdu = handle_nas_pdu(item['value'][1]['nAS-PDU'], dl = True)
|
||||
item['value'][1]['nAS-PDU'] = pdu
|
||||
|
||||
def handle_s1ap(msg):
|
||||
log.info("Processing S1AP message '%s:%s'" % (msg[0], msg[1]['value'][0]))
|
||||
|
||||
# Paging may contain
|
||||
if msg[0] != u'initiatingMessage':
|
||||
return
|
||||
|
||||
# Look for IMSI, TMSI and IMEISV in S1AP
|
||||
if msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_Paging.get_val():
|
||||
handle_s1ap_paging(msg[1]['value'][1])
|
||||
elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_InitialContextSetup.get_val():
|
||||
find_and_handle_s1ap_imei_ie(msg[1]['value'])
|
||||
elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_HandoverResourceAllocation.get_val():
|
||||
find_and_handle_s1ap_imei_ie(msg[1]['value'])
|
||||
|
||||
# Look for NAS payload (which may also contain sensitive info)
|
||||
procCodeListNested = ( # MME-originated NAS payload
|
||||
S1AP.S1AP_Constants.id_E_RABSetup.get_val(),
|
||||
S1AP.S1AP_Constants.id_E_RABModify.get_val(),
|
||||
S1AP.S1AP_Constants.id_InitialContextSetup.get_val())
|
||||
procCodeListDL = ( # MME-originated NAS payload
|
||||
S1AP.S1AP_Constants.id_E_RABRelease.get_val(),
|
||||
S1AP.S1AP_Constants.id_downlinkNASTransport.get_val())
|
||||
procCodeListUL = ( # UE-originated payload
|
||||
S1AP.S1AP_Constants.id_initialUEMessage.get_val(),
|
||||
S1AP.S1AP_Constants.id_uplinkNASTransport.get_val(),
|
||||
S1AP.S1AP_Constants.id_NASNonDeliveryIndication.get_val())
|
||||
if msg[1]['procedureCode'] in procCodeListNested:
|
||||
find_and_handle_s1ap_nested_nas_pdu(msg[1]['value'])
|
||||
elif msg[1]['procedureCode'] in procCodeListDL:
|
||||
find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = True)
|
||||
elif msg[1]['procedureCode'] in procCodeListUL:
|
||||
find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = False)
|
||||
|
||||
def handle_sctp_chunk(chunk):
|
||||
log.debug("Processing an ASN.1 encoded S1AP PDU")
|
||||
|
||||
# Parse ASN.1 encoded S1AP PDU
|
||||
s1ap = S1AP.S1AP_PDU_Descriptions.S1AP_PDU
|
||||
# Guard against malformed packets
|
||||
try:
|
||||
s1ap.from_aper(chunk.data)
|
||||
except:
|
||||
log.error("Malformed packet, skipping...")
|
||||
return False
|
||||
|
||||
handle_s1ap(s1ap())
|
||||
|
||||
# Encapsulate the new payload
|
||||
# TODO: reset checksum fields
|
||||
try:
|
||||
# Encode the S1AP payload
|
||||
payload = s1ap.to_aper()
|
||||
chunk.data = payload
|
||||
|
||||
# Scapy will calculate the length and padding
|
||||
del chunk.len
|
||||
except:
|
||||
log.error("Failed to encode a S1AP payload")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def handle_sctp_pkt(pkt):
|
||||
# Find and decapsulate the S1AP payload
|
||||
ip = pkt.payload
|
||||
sctp = ip.payload
|
||||
|
||||
# There can be multiple chunks, look for Data with S1AP
|
||||
chunk = sctp.payload
|
||||
while not isinstance(chunk, NoPayload):
|
||||
if isinstance(chunk, SCTPChunkData) and chunk.proto_id == 0x12:
|
||||
success = handle_sctp_chunk(chunk)
|
||||
if not success:
|
||||
return None
|
||||
chunk = chunk.payload
|
||||
|
||||
return pkt
|
||||
|
||||
def handle_pcap(src_path, dst_path = '/dev/null'):
|
||||
# Open the destination capture
|
||||
dst_pcap = PcapWriter(dst_path)
|
||||
|
||||
# Open source capture (with sensitive info)
|
||||
src_pcap = PcapReader(src_path)
|
||||
|
||||
# Get the first packet
|
||||
pkt = src_pcap.read_packet()
|
||||
|
||||
# Process all packets in loop
|
||||
while pkt:
|
||||
# TODO: S1AP can be carried over TCP too
|
||||
if pkt.haslayer(SCTP):
|
||||
pkt = handle_sctp_pkt(pkt)
|
||||
|
||||
# Write potentially cleaned packet to the new file
|
||||
if pkt is not None:
|
||||
dst_pcap.write(pkt)
|
||||
|
||||
# Get the next packet
|
||||
pkt = src_pcap.read_packet()
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Configure logging format
|
||||
# Example: [DEBUG] foo_bar.py:71 Mahlzeit!
|
||||
LOG_FMT_DEFAULT = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s"
|
||||
LOG_LEVEL_DEFAULT = log.INFO
|
||||
|
||||
# Default logging handler (stderr)
|
||||
sh = log.StreamHandler()
|
||||
sh.setLevel(log.getLevelName(LOG_LEVEL_DEFAULT))
|
||||
sh.setFormatter(log.Formatter(LOG_FMT_DEFAULT))
|
||||
log.root.addHandler(sh)
|
||||
|
||||
# Set DEBUG for the root logger
|
||||
log.root.setLevel(log.DEBUG)
|
||||
|
||||
# TODO: use argparse
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: %s SRC_PCAP DST_PCAP" % sys.argv[0])
|
||||
sys.exit(1)
|
||||
|
||||
handle_pcap(sys.argv[1], sys.argv[2])
|
Loading…
Reference in New Issue