tool for anonymization of (telecom) PCAP files
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
osmo-pcap-reiniger/s1ap_reiniger.py

282 lines
8.0 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Reiniger: S1AP / NAS packet capture anonymizer.
#
# (C) 2019 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All rights reserved.
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Released under the terms of GNU General Public License,
# Version 2 or (at your option) any later version.
#
# Based on Scapy and pycrate:
# https://github.com/secdev/scapy
# https://github.com/P1sec/pycrate
import logging as log
import sys
from scapy.all import PcapReader, PcapWriter
from scapy.all import SCTPChunkData, NoPayload
from scapy.all import SCTP
from pycrate_asn1dir import S1AP
from pycrate_mobile import NAS
from pycrate_core.charpy import Charpy
IMSI_REPLACEMENT_BYTES = b'\x29\x26\x24' + b'\x00' * 5
def get_key_or_none(elem, k):
try:
id = elem[k]
return id
except ValueError:
return None
def has_key(elem, k):
try:
id = elem[k]
return True
except:
return False
def handle_nas_pdu(pdu, dl, regen = False):
log.debug("Processing %s NAS PDU: %s" % ("Downlink" if dl else "Uplink", pdu.hex()))
(msg, code) = NAS.parse_NASLTE_MT(pdu) if dl else NAS.parse_NASLTE_MO(pdu)
if code:
log.error("Failed to parse NAS payload")
return None
#print(msg.CLASS)
if has_key(msg, 'ID'):
id = msg['ID'][1]
#for k in id:
# print("--- %s, %s" % (k, k.show()))
id_type = id['Type'].get_val()
if id_type == 1: # IMSI
id.from_bytes(Charpy(IMSI_REPLACEMENT_BYTES))
else:
raise FooErr
print("+++ %s" % (id))
regen = True
# Try to find EPSID (may contain IMSI)
# TODO: also patch IMEI / IMEISV
if has_key(msg, 'EPSID'):
epsid = msg['EPSID'][1]
# Check if EPSID contains exactly IMSI
id_type = epsid['Type'].get_val()
print("ID type: %d" % (id_type))
if id_type == 1:
log.info("Cleaning %s" % epsid.repr())
# 262420000000000, Vodafone GmbH, Germany
epsid.from_bytes(Charpy(IMSI_REPLACEMENT_BYTES))
else:
raise FooErr
regen = True
return msg.to_bytes() if regen else pdu
def handle_s1ap_imsi(imsi):
log.info("Cleaning IMSI: %s" % NAS.decode_bcd(imsi))
# 262420000000000, Vodafone GmbH, Germany
return '\x29\x26\x24' + '\x00' * 5
def handle_s1ap_tmsi(tmsi):
log.info("Cleaning TMSI: %s" % tmsi.hex())
return tmsi # NOTE: for now, keep TMSI unchanged
def handle_s1ap_imeisv(imeisv):
log.info("Cleaning IMEISV: %s" % NAS.decode_bcd(imeisv))
# 3555720187847840 (Motorola C113)
return '\x33\x55\x75\x02\x81\x87\x74\x48\xf0'
def find_s1ap_ie(ie_list, ie_id):
for ie in ie_list:
if ie['id'] == ie_id:
return ie
return None
def find_s1ap_ies(ie_list, ie_ids):
for ie in ie_list:
if ie['id'] in ie_ids:
return ie
return None
def handle_s1ap_paging(msg):
ie = find_s1ap_ie(msg['protocolIEs'], S1AP.S1AP_Constants.id_UEPagingID.get_val())
if ie is None:
return
ueid = ie['value'][1]
if ueid[0] == 's-TMSI':
tmsi = handle_s1ap_tmsi(ueid[1]['m-TMSI'])
ueid[1]['m-TMSI'] = tmsi
elif ueid[0] == 'iMSI':
imsi = handle_s1ap_imsi(ueid[1])
# FIXME: I am not 100% sure if this is correct
ie['value'] = ('iMSI', imsi)
else:
log.warn("Unknown Paging identity type '%s'" % ueid[0])
def find_and_handle_s1ap_imei_ie(msg):
ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_Masked_IMEISV.get_val())
if ie is not None:
imeisv = handle_s1ap_imeisv(ie['value'][1])
# FIXME: I am not 100% sure if this is correct
ie['value'] = (ie['value'][0], imeisv)
def find_and_handle_s1ap_nas_pdu(msg, dl):
ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_NAS_PDU.get_val())
if ie is not None:
pdu = handle_nas_pdu(ie['value'][1], dl)
ie['value'] = ('NAS-PDU', pdu)
def find_and_handle_s1ap_nested_nas_pdu(msg):
IEs = {
S1AP.S1AP_Constants.id_E_RABToBeSetupListCtxtSUReq.get_val() : 'E-RABToBeSetupItemCtxtSUReq',
S1AP.S1AP_Constants.id_E_RABToBeSetupListBearerSUReq.get_val() : 'E-RABToBeSetupItemBearerSUReq',
S1AP.S1AP_Constants.id_E_RABToBeModifiedListBearerModReq.get_val() : 'E-RABToBeModifiedItemBearerModReq'}
ie = find_s1ap_ies(msg[1]['protocolIEs'], IEs.keys())
if ie is None:
return
for item in ie['value'][1]:
if item['value'][0] in IEs.values():
if 'nAS-PDU' in item['value'][1]:
pdu = handle_nas_pdu(item['value'][1]['nAS-PDU'], dl = True)
item['value'][1]['nAS-PDU'] = pdu
def handle_s1ap(msg, pkt_number):
log.info("Processing S1AP message '%s:%s' (Packet %d)" % (msg[0], msg[1]['value'][0], pkt_number))
# Paging may contain
if msg[0] != u'initiatingMessage':
return
# Look for IMSI, TMSI and IMEISV in S1AP
if msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_Paging.get_val():
handle_s1ap_paging(msg[1]['value'][1])
elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_InitialContextSetup.get_val():
find_and_handle_s1ap_imei_ie(msg[1]['value'])
elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_HandoverResourceAllocation.get_val():
find_and_handle_s1ap_imei_ie(msg[1]['value'])
# Look for NAS payload (which may also contain sensitive info)
procCodeListNested = ( # MME-originated NAS payload
S1AP.S1AP_Constants.id_E_RABSetup.get_val(),
S1AP.S1AP_Constants.id_E_RABModify.get_val(),
S1AP.S1AP_Constants.id_InitialContextSetup.get_val())
procCodeListDL = ( # MME-originated NAS payload
S1AP.S1AP_Constants.id_E_RABRelease.get_val(),
S1AP.S1AP_Constants.id_downlinkNASTransport.get_val())
procCodeListUL = ( # UE-originated payload
S1AP.S1AP_Constants.id_initialUEMessage.get_val(),
S1AP.S1AP_Constants.id_uplinkNASTransport.get_val(),
S1AP.S1AP_Constants.id_NASNonDeliveryIndication.get_val())
if msg[1]['procedureCode'] in procCodeListNested:
find_and_handle_s1ap_nested_nas_pdu(msg[1]['value'])
elif msg[1]['procedureCode'] in procCodeListDL:
find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = True)
elif msg[1]['procedureCode'] in procCodeListUL:
find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = False)
def handle_sctp_chunk(chunk, pkt_number):
log.debug("Processing an ASN.1 encoded S1AP PDU")
# Parse ASN.1 encoded S1AP PDU
s1ap = S1AP.S1AP_PDU_Descriptions.S1AP_PDU
# Guard against malformed packets
try:
s1ap.from_aper(chunk.data)
except:
log.error("Malformed packet, skipping...")
return False
try:
handle_s1ap(s1ap(), pkt_number)
except:
log.error("Failed to decode S1AP payload (Packet %d)", (pkt_number))
return False
# Encapsulate the new payload
# TODO: reset checksum fields
try:
# Encode the S1AP payload
payload = s1ap.to_aper()
chunk.data = payload
# Scapy will calculate the length and padding
del chunk.len
except:
log.error("Failed to encode a S1AP payload")
return False
return True
def handle_sctp_pkt(pkt, pkt_number):
# Find and decapsulate the S1AP payload
ip = pkt.payload
sctp = ip.payload
# There can be multiple chunks, look for Data with S1AP
chunk = sctp.payload
while not isinstance(chunk, NoPayload):
if isinstance(chunk, SCTPChunkData) and chunk.proto_id == 0x12:
success = handle_sctp_chunk(chunk, pkt_number)
if not success:
return None
chunk = chunk.payload
return pkt
def handle_pcap(src_path, dst_path = '/dev/null'):
i = 1
# Open the destination capture
dst_pcap = PcapWriter(dst_path)
# Open source capture (with sensitive info)
src_pcap = PcapReader(src_path)
# Get the first packet
pkt = src_pcap.read_packet()
# Process all packets in loop
while pkt:
# TODO: S1AP can be carried over TCP too
if pkt.haslayer(SCTP):
pkt = handle_sctp_pkt(pkt, i)
# Write potentially cleaned packet to the new file
if pkt is not None:
dst_pcap.write(pkt)
# Get the next packet
pkt = src_pcap.read_packet()
i += 1
if __name__ == '__main__':
# Configure logging format
# Example: [DEBUG] foo_bar.py:71 Mahlzeit!
LOG_FMT_DEFAULT = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s"
LOG_LEVEL_DEFAULT = log.INFO
# Default logging handler (stderr)
sh = log.StreamHandler()
sh.setLevel(log.getLevelName(LOG_LEVEL_DEFAULT))
sh.setFormatter(log.Formatter(LOG_FMT_DEFAULT))
log.root.addHandler(sh)
# Set DEBUG for the root logger
log.root.setLevel(log.DEBUG)
# TODO: use argparse
if len(sys.argv) < 3:
print("Usage: %s SRC_PCAP DST_PCAP" % sys.argv[0])
sys.exit(1)
handle_pcap(sys.argv[1], sys.argv[2])